From b2d2bafc783f8506fcd7f6f866454484bb1dd6c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Campinas?= Date: Mon, 26 Apr 2021 23:53:32 +0200 Subject: [PATCH 1/6] add benchmark of term streams merge --- src/termdict/merger.rs | 61 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 720db3e80b..848088b1d7 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -114,3 +114,64 @@ impl<'a> TermMerger<'a> { &self.current_streamers[..] } } + +#[cfg(all(test, feature = "unstable"))] +mod bench { + use super::TermMerger; + use crate::directory::FileSlice; + use crate::postings::TermInfo; + use crate::termdict::{TermDictionary, TermDictionaryBuilder}; + use rand::distributions::Alphanumeric; + use rand::{thread_rng, Rng}; + use test::{self, Bencher}; + + fn make_term_info(term_ord: u64) -> TermInfo { + let offset = |term_ord: u64| (term_ord * 100 + term_ord * term_ord) as usize; + TermInfo { + doc_freq: term_ord as u32, + postings_range: offset(term_ord)..offset(term_ord + 1), + positions_idx: offset(term_ord) as u64 * 2u64, + } + } + + /// Create a dictionary of random strings. + fn rand_dict(size: usize) -> crate::Result { + let buffer: Vec = { + let mut terms = vec![]; + for _i in 0..size { + let rand_string: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(30) + .map(char::from) + .collect(); + terms.push(rand_string); + } + terms.sort(); + + let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?; + for i in 0..size { + term_dictionary_builder.insert(terms[i].as_bytes(), &make_term_info(i as u64))?; + } + term_dictionary_builder.finish()? + }; + let file = FileSlice::from(buffer); + TermDictionary::open(file) + } + + #[bench] + fn bench_termmerger_baseline(b: &mut Bencher) -> crate::Result<()> { + let dict1 = rand_dict(100000)?; + let dict2 = rand_dict(100000)?; + b.iter(|| { + let stream1 = dict1.stream().unwrap(); + let stream2 = dict2.stream().unwrap(); + let mut merger = TermMerger::new(vec![stream1, stream2]); + let mut count = 0; + while merger.advance() { + count += 1; + } + count + }); + Ok(()) + } +} From 4f5d2eb16174ed8c9a79b06f8c1e3252a9a35782 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Campinas?= Date: Thu, 6 May 2021 16:49:51 +0200 Subject: [PATCH 2/6] changes as per review --- src/termdict/merger.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 848088b1d7..f6782217c5 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -130,18 +130,18 @@ mod bench { TermInfo { doc_freq: term_ord as u32, postings_range: offset(term_ord)..offset(term_ord + 1), - positions_idx: offset(term_ord) as u64 * 2u64, + positions_range: offset(term_ord)..offset(term_ord + 1), } } /// Create a dictionary of random strings. - fn rand_dict(size: usize) -> crate::Result { + fn rand_dict(num_terms: usize) -> crate::Result { let buffer: Vec = { let mut terms = vec![]; - for _i in 0..size { + for _i in 0..num_terms { let rand_string: String = thread_rng() .sample_iter(&Alphanumeric) - .take(30) + .take(thread_rng().gen_range(30..42)) .map(char::from) .collect(); terms.push(rand_string); @@ -149,7 +149,7 @@ mod bench { terms.sort(); let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?; - for i in 0..size { + for i in 0..num_terms { term_dictionary_builder.insert(terms[i].as_bytes(), &make_term_info(i as u64))?; } term_dictionary_builder.finish()? @@ -160,17 +160,17 @@ mod bench { #[bench] fn bench_termmerger_baseline(b: &mut Bencher) -> crate::Result<()> { - let dict1 = rand_dict(100000)?; - let dict2 = rand_dict(100000)?; - b.iter(|| { - let stream1 = dict1.stream().unwrap(); - let stream2 = dict2.stream().unwrap(); + let dict1 = rand_dict(100_000)?; + let dict2 = rand_dict(100_000)?; + b.iter(|| -> crate::Result { + let stream1 = dict1.stream()?; + let stream2 = dict2.stream()?; let mut merger = TermMerger::new(vec![stream1, stream2]); let mut count = 0; while merger.advance() { count += 1; } - count + Ok(count) }); Ok(()) } From 258e99852cf11ad630bf4b0c510963522abd1bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Campinas?= Date: Mon, 24 May 2021 02:43:44 +0200 Subject: [PATCH 3/6] use union based on FST for merging the term dictionaries --- src/indexer/merger.rs | 8 +- src/termdict/fst_termdict/streamer.rs | 4 +- src/termdict/merger.rs | 105 +++++++++----------------- 3 files changed, 42 insertions(+), 75 deletions(-) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 087288d8ce..fc654d74e8 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -864,13 +864,11 @@ impl IndexMerger { let mut total_doc_freq = 0; // Let's compute the list of non-empty posting lists - for heap_item in merged_terms.current_kvs() { - let segment_ord = heap_item.segment_ord; - let term_info = heap_item.streamer.value(); - let segment_reader = &self.readers[heap_item.segment_ord]; + for (segment_ord, term_info) in merged_terms.current_kvs() { + let segment_reader = &self.readers[segment_ord]; let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord]; let segment_postings = inverted_index - .read_postings_from_terminfo(term_info, segment_postings_option)?; + .read_postings_from_terminfo(&term_info, segment_postings_option)?; let delete_bitset_opt = segment_reader.delete_bitset(); let doc_freq = if let Some(delete_bitset) = delete_bitset_opt { segment_postings.doc_freq_given_deletes(delete_bitset) diff --git a/src/termdict/fst_termdict/streamer.rs b/src/termdict/fst_termdict/streamer.rs index 66ce02c2ab..dcfe5c10a6 100644 --- a/src/termdict/fst_termdict/streamer.rs +++ b/src/termdict/fst_termdict/streamer.rs @@ -78,8 +78,8 @@ pub struct TermStreamer<'a, A = AlwaysMatch> where A: Automaton, { - fst_map: &'a TermDictionary, - stream: Stream<'a, A>, + pub fst_map: &'a TermDictionary, + pub stream: Stream<'a, A>, term_ord: TermOrdinal, current_key: Vec, current_value: TermInfo, diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index f6782217c5..73d9f9ab08 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -1,32 +1,11 @@ +use crate::postings::TermInfo; +use crate::termdict::TermDictionary; use crate::termdict::TermOrdinal; use crate::termdict::TermStreamer; -use std::cmp::Ordering; -use std::collections::BinaryHeap; - -pub struct HeapItem<'a> { - pub streamer: TermStreamer<'a>, - pub segment_ord: usize, -} - -impl<'a> PartialEq for HeapItem<'a> { - fn eq(&self, other: &Self) -> bool { - self.segment_ord == other.segment_ord - } -} - -impl<'a> Eq for HeapItem<'a> {} - -impl<'a> PartialOrd for HeapItem<'a> { - fn partial_cmp(&self, other: &HeapItem<'a>) -> Option { - Some(self.cmp(other)) - } -} - -impl<'a> Ord for HeapItem<'a> { - fn cmp(&self, other: &HeapItem<'a>) -> Ordering { - (&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord)) - } -} +use tantivy_fst::map::OpBuilder; +use tantivy_fst::map::Union; +use tantivy_fst::raw::IndexedValue; +use tantivy_fst::Streamer; /// Given a list of sorted term streams, /// returns an iterator over sorted unique terms. @@ -34,61 +13,46 @@ impl<'a> Ord for HeapItem<'a> { /// The item yield is actually a pair with /// - the term /// - a slice with the ordinal of the segments containing -/// the terms. +/// the term. pub struct TermMerger<'a> { - heap: BinaryHeap>, - current_streamers: Vec>, + dictionaries: Vec<&'a TermDictionary>, + union: Union<'a>, + key: Vec, + matching_streams: Vec, } impl<'a> TermMerger<'a> { /// Stream of merged term dictionary /// pub fn new(streams: Vec>) -> TermMerger<'a> { + let mut op_builder = OpBuilder::new(); + let mut dictionaries = vec![]; + for streamer in streams { + op_builder.push(streamer.stream); + dictionaries.push(streamer.fst_map); + } TermMerger { - heap: BinaryHeap::new(), - current_streamers: streams - .into_iter() - .enumerate() - .map(|(ord, streamer)| HeapItem { - streamer, - segment_ord: ord, - }) - .collect(), + dictionaries, + union: op_builder.union(), + key: vec![], + matching_streams: vec![], } } - pub(crate) fn matching_segments<'b: 'a>( - &'b self, - ) -> impl 'b + Iterator { - self.current_streamers - .iter() - .map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())) - } - - fn advance_segments(&mut self) { - let streamers = &mut self.current_streamers; - let heap = &mut self.heap; - for mut heap_item in streamers.drain(..) { - if heap_item.streamer.advance() { - heap.push(heap_item); - } - } + pub fn matching_segments<'b: 'a>(&'b self) -> impl 'b + Iterator { + self.matching_streams.iter().map(|iv| (iv.index, iv.value)) } /// Advance the term iterator to the next term. /// Returns true if there is indeed another term /// False if there is none. pub fn advance(&mut self) -> bool { - self.advance_segments(); - if let Some(head) = self.heap.pop() { - self.current_streamers.push(head); - while let Some(next_streamer) = self.heap.peek() { - if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() { - break; - } - let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand - self.current_streamers.push(next_heap_it); - } + if let Some((k, values)) = self.union.next() { + self.key.clear(); + self.key.extend_from_slice(k); + self.matching_streams.clear(); + self.matching_streams.extend_from_slice(values); + self.matching_streams.sort_by_key(|iv| iv.index); true } else { false @@ -101,7 +65,7 @@ impl<'a> TermMerger<'a> { /// iff advance() has been called before /// and "true" was returned. pub fn key(&self) -> &[u8] { - self.current_streamers[0].streamer.key() + &self.key } /// Returns the sorted list of segment ordinals @@ -110,8 +74,13 @@ impl<'a> TermMerger<'a> { /// This method may be called /// iff advance() has been called before /// and "true" was returned. - pub fn current_kvs(&self) -> &[HeapItem<'a>] { - &self.current_streamers[..] + pub fn current_kvs<'b: 'a>(&'b self) -> impl 'b + Iterator { + self.matching_streams.iter().map(move |iv| { + ( + iv.index, + self.dictionaries[iv.index].term_info_from_ord(iv.value), + ) + }) } } From 12516e82bef646e1fa617b0196602548ed55c70f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Campinas?= Date: Wed, 26 May 2021 15:20:55 +0200 Subject: [PATCH 4/6] cleanup --- src/indexer/merger.rs | 2 +- src/termdict/fst_termdict/streamer.rs | 4 +-- src/termdict/merger.rs | 47 +++++++++++++++------------ 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index fc654d74e8..864c883cec 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -864,7 +864,7 @@ impl IndexMerger { let mut total_doc_freq = 0; // Let's compute the list of non-empty posting lists - for (segment_ord, term_info) in merged_terms.current_kvs() { + for (segment_ord, term_info) in merged_terms.current_segment_ordinals_and_term_infos() { let segment_reader = &self.readers[segment_ord]; let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord]; let segment_postings = inverted_index diff --git a/src/termdict/fst_termdict/streamer.rs b/src/termdict/fst_termdict/streamer.rs index dcfe5c10a6..10a72d6dc5 100644 --- a/src/termdict/fst_termdict/streamer.rs +++ b/src/termdict/fst_termdict/streamer.rs @@ -78,8 +78,8 @@ pub struct TermStreamer<'a, A = AlwaysMatch> where A: Automaton, { - pub fst_map: &'a TermDictionary, - pub stream: Stream<'a, A>, + pub(crate) fst_map: &'a TermDictionary, + pub(crate) stream: Stream<'a, A>, term_ord: TermOrdinal, current_key: Vec, current_value: TermInfo, diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 73d9f9ab08..7cd212cd04 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -17,8 +17,8 @@ use tantivy_fst::Streamer; pub struct TermMerger<'a> { dictionaries: Vec<&'a TermDictionary>, union: Union<'a>, - key: Vec, - matching_streams: Vec, + current_key: Vec, + current_segment_and_term_ordinals: Vec, } impl<'a> TermMerger<'a> { @@ -34,13 +34,15 @@ impl<'a> TermMerger<'a> { TermMerger { dictionaries, union: op_builder.union(), - key: vec![], - matching_streams: vec![], + current_key: vec![], + current_segment_and_term_ordinals: vec![], } } pub fn matching_segments<'b: 'a>(&'b self) -> impl 'b + Iterator { - self.matching_streams.iter().map(|iv| (iv.index, iv.value)) + self.current_segment_and_term_ordinals + .iter() + .map(|iv| (iv.index, iv.value)) } /// Advance the term iterator to the next term. @@ -48,11 +50,13 @@ impl<'a> TermMerger<'a> { /// False if there is none. pub fn advance(&mut self) -> bool { if let Some((k, values)) = self.union.next() { - self.key.clear(); - self.key.extend_from_slice(k); - self.matching_streams.clear(); - self.matching_streams.extend_from_slice(values); - self.matching_streams.sort_by_key(|iv| iv.index); + self.current_key.clear(); + self.current_key.extend_from_slice(k); + self.current_segment_and_term_ordinals.clear(); + self.current_segment_and_term_ordinals + .extend_from_slice(values); + self.current_segment_and_term_ordinals + .sort_by_key(|iv| iv.index); true } else { false @@ -65,22 +69,25 @@ impl<'a> TermMerger<'a> { /// iff advance() has been called before /// and "true" was returned. pub fn key(&self) -> &[u8] { - &self.key + &self.current_key } - /// Returns the sorted list of segment ordinals - /// that include the current term. + /// Iterator over (segment ordinal, TermInfo) pairs iterator sorted by the ordinal. /// /// This method may be called /// iff advance() has been called before /// and "true" was returned. - pub fn current_kvs<'b: 'a>(&'b self) -> impl 'b + Iterator { - self.matching_streams.iter().map(move |iv| { - ( - iv.index, - self.dictionaries[iv.index].term_info_from_ord(iv.value), - ) - }) + pub fn current_segment_ordinals_and_term_infos<'b: 'a>( + &'b self, + ) -> impl 'b + Iterator { + self.current_segment_and_term_ordinals + .iter() + .map(move |iv| { + ( + iv.index, + self.dictionaries[iv.index].term_info_from_ord(iv.value), + ) + }) } } From 174f1fe59beaf6bd73ff5958ed3a260569601633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Campinas?= Date: Fri, 28 May 2021 09:43:33 +0200 Subject: [PATCH 5/6] Rename TermMerger benchmark --- src/termdict/merger.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 7cd212cd04..2c3d2e18e0 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -135,7 +135,7 @@ mod bench { } #[bench] - fn bench_termmerger_baseline(b: &mut Bencher) -> crate::Result<()> { + fn bench_termmerger(b: &mut Bencher) -> crate::Result<()> { let dict1 = rand_dict(100_000)?; let dict2 = rand_dict(100_000)?; b.iter(|| -> crate::Result { From 66b633aba749b8a10366c3eab6ec4f17ee2299ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Campinas?= Date: Fri, 28 May 2021 10:47:21 +0200 Subject: [PATCH 6/6] Updated changelog with contribution --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5277168c72..30f495bebc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ Tantivy 0.15.0 - Add detection to avoid small doc store blocks on merge (@PSeitz). #1054 - Make doc store compression dynamic (@PSeitz). #1060 - Switch to json for footer version handling (@PSeitz). #1060 - +- Updated TermMerger implementation to rely on the union feature of the FST (@scampi) #469 Tantivy 0.14.0 =========================