Skip to content

Commit

Permalink
fix and refactor log merge policy, fixes quickwit-oss#1035
Browse files Browse the repository at this point in the history
fixes a bug in log merge policy where an index was wrongly referenced by its index
  • Loading branch information
PSeitz committed May 18, 2021
1 parent 7ba771e commit 8ea4653
Showing 1 changed file with 81 additions and 23 deletions.
104 changes: 81 additions & 23 deletions src/indexer/log_merge_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,40 +54,35 @@ impl LogMergePolicy {

impl MergePolicy for LogMergePolicy {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
let mut size_sorted_tuples = segments
let mut size_sorted_segments = segments
.iter()
.map(SegmentMeta::num_docs)
.enumerate()
.filter(|(_, s)| s <= &(self.max_merge_size as u32))
.collect::<Vec<(usize, u32)>>();
.filter(|s| s.num_docs() <= (self.max_merge_size as u32))
.collect::<Vec<&SegmentMeta>>();

size_sorted_tuples.sort_by(|x, y| y.1.cmp(&(x.1)));

if size_sorted_tuples.len() <= 1 {
return Vec::new();
if size_sorted_segments.len() <= 1 {
return vec![];
}
size_sorted_segments.sort_by_key(|seg| seg.num_docs());

let size_sorted_log_tuples: Vec<_> = size_sorted_tuples
let sorted_segments_with_log_size: Vec<_> = size_sorted_segments
.into_iter()
.map(|(ind, num_docs)| (ind, f64::from(self.clip_min_size(num_docs)).log2()))
.map(|seg| (seg, f64::from(self.clip_min_size(seg.num_docs())).log2()))
.collect();

if let Some(&(first_ind, first_score)) = size_sorted_log_tuples.first() {
let mut current_max_log_size = first_score;
let mut levels = vec![vec![first_ind]];
for &(ind, score) in (&size_sorted_log_tuples).iter().skip(1) {
if score < (current_max_log_size - self.level_log_size) {
current_max_log_size = score;
if let Some(&(first_segment, log_size)) = sorted_segments_with_log_size.first() {
let mut current_max_log_size = log_size;
let mut levels = vec![vec![first_segment]];
for &(segment, segment_log_size) in (&sorted_segments_with_log_size).iter().skip(1) {
if segment_log_size < (current_max_log_size - self.level_log_size) {
current_max_log_size = segment_log_size;
levels.push(Vec::new());
}
levels.last_mut().unwrap().push(ind);
levels.last_mut().unwrap().push(segment);
}
levels
.iter()
.filter(|level| level.len() >= self.min_merge_size)
.map(|ind_vec| {
MergeCandidate(ind_vec.iter().map(|&ind| segments[ind].id()).collect())
})
.map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect()))
.collect()
} else {
return vec![];
Expand All @@ -109,12 +104,75 @@ impl Default for LogMergePolicy {
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::merge_policy::MergePolicy;
use crate::{
core::{SegmentId, SegmentMeta, SegmentMetaInventory},
schema,
};
use crate::{indexer::merge_policy::MergePolicy, schema::INDEXED};
use once_cell::sync::Lazy;

static INVENTORY: Lazy<SegmentMetaInventory> = Lazy::new(SegmentMetaInventory::default);

use crate::Index;

#[test]
fn create_index_test_max_merge_issue_1035() {
let mut schema_builder = schema::Schema::builder();
let int_field = schema_builder.add_u64_field("intval", INDEXED);
let schema = schema_builder.build();

let index = Index::create_in_ram(schema);

{
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_merge_size(1);
log_merge_policy.set_max_merge_size(1);
log_merge_policy.set_min_layer_size(0);

let mut index_writer = index.writer_for_tests().unwrap();
index_writer.set_merge_policy(Box::new(log_merge_policy));

// after every commit the merge checker is started, it will merge only segments with 1
// element in it because of the max_merge_size.
index_writer.add_document(doc!(int_field=>1_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>2_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>3_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>4_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>5_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>6_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>7_u64));
assert!(index_writer.commit().is_ok());

index_writer.add_document(doc!(int_field=>8_u64));
assert!(index_writer.commit().is_ok());
}

let _segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");

let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_readers = searcher.segment_readers();
for segment in segment_readers {
if segment.num_docs() > 2 {
panic!("segment can't have more than two segments");
} // don't know how to wait for the merge, then it could be a simple eq
}
}

fn test_merge_policy() -> LogMergePolicy {
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_merge_size(3);
Expand Down

0 comments on commit 8ea4653

Please sign in to comment.