From 8ea4653c9b980afb98a0a9288b51beb6e3da6438 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 17 May 2021 16:35:49 +0200 Subject: [PATCH] fix and refactor log merge policy, fixes #1035 fixes a bug in log merge policy where an index was wrongly referenced by its index --- src/indexer/log_merge_policy.rs | 104 +++++++++++++++++++++++++------- 1 file changed, 81 insertions(+), 23 deletions(-) diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index b0a2ba100f..077e6946e0 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -54,40 +54,35 @@ impl LogMergePolicy { impl MergePolicy for LogMergePolicy { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec { - let mut size_sorted_tuples = segments + let mut size_sorted_segments = segments .iter() - .map(SegmentMeta::num_docs) - .enumerate() - .filter(|(_, s)| s <= &(self.max_merge_size as u32)) - .collect::>(); + .filter(|s| s.num_docs() <= (self.max_merge_size as u32)) + .collect::>(); - size_sorted_tuples.sort_by(|x, y| y.1.cmp(&(x.1))); - - if size_sorted_tuples.len() <= 1 { - return Vec::new(); + if size_sorted_segments.len() <= 1 { + return vec![]; } + size_sorted_segments.sort_by_key(|seg| seg.num_docs()); - let size_sorted_log_tuples: Vec<_> = size_sorted_tuples + let sorted_segments_with_log_size: Vec<_> = size_sorted_segments .into_iter() - .map(|(ind, num_docs)| (ind, f64::from(self.clip_min_size(num_docs)).log2())) + .map(|seg| (seg, f64::from(self.clip_min_size(seg.num_docs())).log2())) .collect(); - if let Some(&(first_ind, first_score)) = size_sorted_log_tuples.first() { - let mut current_max_log_size = first_score; - let mut levels = vec![vec![first_ind]]; - for &(ind, score) in (&size_sorted_log_tuples).iter().skip(1) { - if score < (current_max_log_size - self.level_log_size) { - current_max_log_size = score; + if let Some(&(first_segment, log_size)) = sorted_segments_with_log_size.first() { + let mut current_max_log_size = log_size; + let mut levels = vec![vec![first_segment]]; + for &(segment, segment_log_size) in (&sorted_segments_with_log_size).iter().skip(1) { + if segment_log_size < (current_max_log_size - self.level_log_size) { + current_max_log_size = segment_log_size; levels.push(Vec::new()); } - levels.last_mut().unwrap().push(ind); + levels.last_mut().unwrap().push(segment); } levels .iter() .filter(|level| level.len() >= self.min_merge_size) - .map(|ind_vec| { - MergeCandidate(ind_vec.iter().map(|&ind| segments[ind].id()).collect()) - }) + .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) .collect() } else { return vec![]; @@ -109,12 +104,75 @@ impl Default for LogMergePolicy { #[cfg(test)] mod tests { use super::*; - use crate::core::{SegmentId, SegmentMeta, SegmentMetaInventory}; - use crate::indexer::merge_policy::MergePolicy; + use crate::{ + core::{SegmentId, SegmentMeta, SegmentMetaInventory}, + schema, + }; + use crate::{indexer::merge_policy::MergePolicy, schema::INDEXED}; use once_cell::sync::Lazy; static INVENTORY: Lazy = Lazy::new(SegmentMetaInventory::default); + use crate::Index; + + #[test] + fn create_index_test_max_merge_issue_1035() { + let mut schema_builder = schema::Schema::builder(); + let int_field = schema_builder.add_u64_field("intval", INDEXED); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + + { + let mut log_merge_policy = LogMergePolicy::default(); + log_merge_policy.set_min_merge_size(1); + log_merge_policy.set_max_merge_size(1); + log_merge_policy.set_min_layer_size(0); + + let mut index_writer = index.writer_for_tests().unwrap(); + index_writer.set_merge_policy(Box::new(log_merge_policy)); + + // after every commit the merge checker is started, it will merge only segments with 1 + // element in it because of the max_merge_size. + index_writer.add_document(doc!(int_field=>1_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>2_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>3_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>4_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>5_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>6_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>7_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>8_u64)); + assert!(index_writer.commit().is_ok()); + } + + let _segment_ids = index + .searchable_segment_ids() + .expect("Searchable segments failed."); + + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + let segment_readers = searcher.segment_readers(); + for segment in segment_readers { + if segment.num_docs() > 2 { + panic!("segment can't have more than two segments"); + } // don't know how to wait for the merge, then it could be a simple eq + } + } + fn test_merge_policy() -> LogMergePolicy { let mut log_merge_policy = LogMergePolicy::default(); log_merge_policy.set_min_merge_size(3);