From bcd72e5c1431b6da78d1d51f9363c13de09ea3cb Mon Sep 17 00:00:00 2001 From: PSeitz Date: Wed, 19 May 2021 03:48:46 +0200 Subject: [PATCH] fix and refactor log merge policy, fixes #1035 (#1043) * fix and refactor log merge policy, fixes #1035 fixes a bug in log merge policy where an index was wrongly referenced by its index * cleanup * fix sort order, improve method names * use itertools groupby, fix serialization test * minor improvments * update names --- src/directory/mmap_directory.rs | 2 +- src/indexer/index_writer.rs | 2 +- src/indexer/log_merge_policy.rs | 154 ++++++++++++++++++++++---------- src/indexer/merger.rs | 2 +- 4 files changed, 108 insertions(+), 52 deletions(-) diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index d929c524a9..be06032713 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -593,7 +593,7 @@ mod tests { let mut index_writer = index.writer_for_tests().unwrap(); let mut log_merge_policy = LogMergePolicy::default(); - log_merge_policy.set_min_merge_size(3); + log_merge_policy.set_min_num_segments(3); index_writer.set_merge_policy(Box::new(log_merge_policy)); for _num_commits in 0..10 { for _ in 0..10 { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index dee4fefbc6..718f5203f1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -945,7 +945,7 @@ mod tests { let index_writer = index.writer(3_000_000).unwrap(); assert_eq!( format!("{:?}", index_writer.get_merge_policy()), - "LogMergePolicy { min_merge_size: 8, max_merge_size: 10000000, min_layer_size: 10000, \ + "LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \ level_log_size: 0.75 }" ); let merge_policy = Box::new(NoMergePolicy::default()); diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index b0a2ba100f..a7051ca324 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -1,19 +1,20 @@ use super::merge_policy::{MergeCandidate, MergePolicy}; use crate::core::SegmentMeta; +use itertools::Itertools; use std::cmp; use std::f64; const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75; const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000; -const DEFAULT_MIN_MERGE_SIZE: usize = 8; -const DEFAULT_MAX_MERGE_SIZE: usize = 10_000_000; +const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8; +const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000; /// `LogMergePolicy` tries to merge segments that have a similar number of /// documents. #[derive(Debug, Clone)] pub struct LogMergePolicy { - min_merge_size: usize, - max_merge_size: usize, + min_num_segments: usize, + max_docs_before_merge: usize, min_layer_size: u32, level_log_size: f64, } @@ -23,15 +24,16 @@ impl LogMergePolicy { cmp::max(self.min_layer_size, size) } - /// Set the minimum number of segment that may be merge together. - pub fn set_min_merge_size(&mut self, min_merge_size: usize) { - self.min_merge_size = min_merge_size; + /// Set the minimum number of segments that may be merged together. + pub fn set_min_num_segments(&mut self, min_num_segments: usize) { + self.min_num_segments = min_num_segments; } /// Set the maximum number docs in a segment for it to be considered for - /// merging. - pub fn set_max_merge_size(&mut self, max_merge_size: usize) { - self.max_merge_size = max_merge_size; + /// merging. A segment can still reach more than max_docs, by merging many + /// smaller ones. + pub fn set_max_docs_before_merge(&mut self, max_docs_merge_size: usize) { + self.max_docs_before_merge = max_docs_merge_size; } /// Set the minimum segment size under which all segment belong @@ -42,7 +44,7 @@ impl LogMergePolicy { /// Set the ratio between two consecutive levels. /// - /// Segment are group in levels according to their sizes. + /// Segments are grouped in levels according to their sizes. /// These levels are defined as intervals of exponentially growing sizes. /// level_log_size define the factor by which one should multiply the limit /// to reach a level, in order to get the limit to reach the following @@ -54,52 +56,43 @@ impl LogMergePolicy { impl MergePolicy for LogMergePolicy { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec { - let mut size_sorted_tuples = segments + let mut size_sorted_segments = segments .iter() - .map(SegmentMeta::num_docs) - .enumerate() - .filter(|(_, s)| s <= &(self.max_merge_size as u32)) - .collect::>(); + .filter(|segment_meta| segment_meta.num_docs() <= (self.max_docs_before_merge as u32)) + .collect::>(); - size_sorted_tuples.sort_by(|x, y| y.1.cmp(&(x.1))); - - if size_sorted_tuples.len() <= 1 { - return Vec::new(); + if size_sorted_segments.len() <= 1 { + return vec![]; } + size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs())); - let size_sorted_log_tuples: Vec<_> = size_sorted_tuples - .into_iter() - .map(|(ind, num_docs)| (ind, f64::from(self.clip_min_size(num_docs)).log2())) - .collect(); - - if let Some(&(first_ind, first_score)) = size_sorted_log_tuples.first() { - let mut current_max_log_size = first_score; - let mut levels = vec![vec![first_ind]]; - for &(ind, score) in (&size_sorted_log_tuples).iter().skip(1) { - if score < (current_max_log_size - self.level_log_size) { - current_max_log_size = score; - levels.push(Vec::new()); - } - levels.last_mut().unwrap().push(ind); + let mut current_max_log_size = f64::MAX; + let mut levels = vec![]; + for (_, merge_group) in &size_sorted_segments.into_iter().group_by(|segment| { + let segment_log_size = f64::from(self.clip_min_size(segment.num_docs())).log2(); + if segment_log_size < (current_max_log_size - self.level_log_size) { + // update current_max_log_size to create a new group + current_max_log_size = segment_log_size; } - levels - .iter() - .filter(|level| level.len() >= self.min_merge_size) - .map(|ind_vec| { - MergeCandidate(ind_vec.iter().map(|&ind| segments[ind].id()).collect()) - }) - .collect() - } else { - return vec![]; + // return current_max_log_size to be grouped to the current group + current_max_log_size + }) { + levels.push(merge_group.collect::>()); } + + levels + .iter() + .filter(|level| level.len() >= self.min_num_segments) + .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) + .collect() } } impl Default for LogMergePolicy { fn default() -> LogMergePolicy { LogMergePolicy { - min_merge_size: DEFAULT_MIN_MERGE_SIZE, - max_merge_size: DEFAULT_MAX_MERGE_SIZE, + min_num_segments: DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE, + max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE, min_layer_size: DEFAULT_MIN_LAYER_SIZE, level_log_size: DEFAULT_LEVEL_LOG_SIZE, } @@ -109,16 +102,79 @@ impl Default for LogMergePolicy { #[cfg(test)] mod tests { use super::*; - use crate::core::{SegmentId, SegmentMeta, SegmentMetaInventory}; - use crate::indexer::merge_policy::MergePolicy; + use crate::{ + core::{SegmentId, SegmentMeta, SegmentMetaInventory}, + schema, + }; + use crate::{indexer::merge_policy::MergePolicy, schema::INDEXED}; use once_cell::sync::Lazy; static INVENTORY: Lazy = Lazy::new(SegmentMetaInventory::default); + use crate::Index; + + #[test] + fn create_index_test_max_merge_issue_1035() { + let mut schema_builder = schema::Schema::builder(); + let int_field = schema_builder.add_u64_field("intval", INDEXED); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + + { + let mut log_merge_policy = LogMergePolicy::default(); + log_merge_policy.set_min_num_segments(1); + log_merge_policy.set_max_docs_before_merge(1); + log_merge_policy.set_min_layer_size(0); + + let mut index_writer = index.writer_for_tests().unwrap(); + index_writer.set_merge_policy(Box::new(log_merge_policy)); + + // after every commit the merge checker is started, it will merge only segments with 1 + // element in it because of the max_merge_size. + index_writer.add_document(doc!(int_field=>1_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>2_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>3_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>4_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>5_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>6_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>7_u64)); + assert!(index_writer.commit().is_ok()); + + index_writer.add_document(doc!(int_field=>8_u64)); + assert!(index_writer.commit().is_ok()); + } + + let _segment_ids = index + .searchable_segment_ids() + .expect("Searchable segments failed."); + + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + let segment_readers = searcher.segment_readers(); + for segment in segment_readers { + if segment.num_docs() > 2 { + panic!("segment can't have more than two segments"); + } // don't know how to wait for the merge, then it could be a simple eq + } + } + fn test_merge_policy() -> LogMergePolicy { let mut log_merge_policy = LogMergePolicy::default(); - log_merge_policy.set_min_merge_size(3); - log_merge_policy.set_max_merge_size(100_000); + log_merge_policy.set_min_num_segments(3); + log_merge_policy.set_max_docs_before_merge(100_000); log_merge_policy.set_min_layer_size(2); log_merge_policy } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index b65b5431a0..33c9246359 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1828,7 +1828,7 @@ mod tests { // Make sure we'll attempt to merge every created segment let mut policy = crate::indexer::LogMergePolicy::default(); - policy.set_min_merge_size(2); + policy.set_min_num_segments(2); writer.set_merge_policy(Box::new(policy)); for i in 0..100 {