Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add benchmark of term streams merge #1024

Merged
merged 6 commits into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Tantivy 0.15.0
- Add detection to avoid small doc store blocks on merge (@PSeitz). #1054
- Make doc store compression dynamic (@PSeitz). #1060
- Switch to json for footer version handling (@PSeitz). #1060

- Updated TermMerger implementation to rely on the union feature of the FST (@scampi) #469

Tantivy 0.14.0
=========================
Expand Down
8 changes: 3 additions & 5 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,13 +864,11 @@ impl IndexMerger {
let mut total_doc_freq = 0;

// Let's compute the list of non-empty posting lists
for heap_item in merged_terms.current_kvs() {
let segment_ord = heap_item.segment_ord;
let term_info = heap_item.streamer.value();
let segment_reader = &self.readers[heap_item.segment_ord];
for (segment_ord, term_info) in merged_terms.current_segment_ordinals_and_term_infos() {
let segment_reader = &self.readers[segment_ord];
let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord];
let segment_postings = inverted_index
.read_postings_from_terminfo(term_info, segment_postings_option)?;
.read_postings_from_terminfo(&term_info, segment_postings_option)?;
let delete_bitset_opt = segment_reader.delete_bitset();
let doc_freq = if let Some(delete_bitset) = delete_bitset_opt {
segment_postings.doc_freq_given_deletes(delete_bitset)
Expand Down
4 changes: 2 additions & 2 deletions src/termdict/fst_termdict/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ pub struct TermStreamer<'a, A = AlwaysMatch>
where
A: Automaton,
{
fst_map: &'a TermDictionary,
stream: Stream<'a, A>,
pub(crate) fst_map: &'a TermDictionary,
pub(crate) stream: Stream<'a, A>,
term_ord: TermOrdinal,
current_key: Vec<u8>,
current_value: TermInfo,
Expand Down
175 changes: 106 additions & 69 deletions src/termdict/merger.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,62 @@
use crate::postings::TermInfo;
use crate::termdict::TermDictionary;
use crate::termdict::TermOrdinal;
use crate::termdict::TermStreamer;
use std::cmp::Ordering;
use std::collections::BinaryHeap;

pub struct HeapItem<'a> {
pub streamer: TermStreamer<'a>,
pub segment_ord: usize,
}

impl<'a> PartialEq for HeapItem<'a> {
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}

impl<'a> Eq for HeapItem<'a> {}

impl<'a> PartialOrd for HeapItem<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<'a> Ord for HeapItem<'a> {
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
}
}
use tantivy_fst::map::OpBuilder;
use tantivy_fst::map::Union;
use tantivy_fst::raw::IndexedValue;
use tantivy_fst::Streamer;

/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
/// the term.
pub struct TermMerger<'a> {
heap: BinaryHeap<HeapItem<'a>>,
current_streamers: Vec<HeapItem<'a>>,
dictionaries: Vec<&'a TermDictionary>,
union: Union<'a>,
current_key: Vec<u8>,
current_segment_and_term_ordinals: Vec<IndexedValue>,
}

impl<'a> TermMerger<'a> {
/// Stream of merged term dictionary
///
pub fn new(streams: Vec<TermStreamer<'a>>) -> TermMerger<'a> {
let mut op_builder = OpBuilder::new();
let mut dictionaries = vec![];
for streamer in streams {
op_builder.push(streamer.stream);
dictionaries.push(streamer.fst_map);
}
TermMerger {
heap: BinaryHeap::new(),
current_streamers: streams
.into_iter()
.enumerate()
.map(|(ord, streamer)| HeapItem {
streamer,
segment_ord: ord,
})
.collect(),
dictionaries,
union: op_builder.union(),
current_key: vec![],
current_segment_and_term_ordinals: vec![],
}
}

pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers
pub fn matching_segments<'b: 'a>(&'b self) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_segment_and_term_ordinals
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
}

fn advance_segments(&mut self) {
let streamers = &mut self.current_streamers;
let heap = &mut self.heap;
for mut heap_item in streamers.drain(..) {
if heap_item.streamer.advance() {
heap.push(heap_item);
}
}
.map(|iv| (iv.index, iv.value))
}

/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(head) = self.heap.pop() {
self.current_streamers.push(head);
while let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it);
}
if let Some((k, values)) = self.union.next() {
self.current_key.clear();
self.current_key.extend_from_slice(k);
self.current_segment_and_term_ordinals.clear();
self.current_segment_and_term_ordinals
.extend_from_slice(values);
self.current_segment_and_term_ordinals
.sort_by_key(|iv| iv.index);
true
} else {
false
Expand All @@ -101,16 +69,85 @@ impl<'a> TermMerger<'a> {
/// iff advance() has been called before
/// and "true" was returned.
pub fn key(&self) -> &[u8] {
self.current_streamers[0].streamer.key()
&self.current_key
}

/// Returns the sorted list of segment ordinals
/// that include the current term.
/// Iterator over (segment ordinal, TermInfo) pairs iterator sorted by the ordinal.
///
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn current_kvs(&self) -> &[HeapItem<'a>] {
&self.current_streamers[..]
pub fn current_segment_ordinals_and_term_infos<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermInfo)> {
self.current_segment_and_term_ordinals
.iter()
.map(move |iv| {
(
iv.index,
self.dictionaries[iv.index].term_info_from_ord(iv.value),
)
})
}
}

#[cfg(all(test, feature = "unstable"))]
mod bench {
use super::TermMerger;
use crate::directory::FileSlice;
use crate::postings::TermInfo;
use crate::termdict::{TermDictionary, TermDictionaryBuilder};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use test::{self, Bencher};

fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| (term_ord * 100 + term_ord * term_ord) as usize;
TermInfo {
doc_freq: term_ord as u32,
postings_range: offset(term_ord)..offset(term_ord + 1),
positions_range: offset(term_ord)..offset(term_ord + 1),
}
}

/// Create a dictionary of random strings.
fn rand_dict(num_terms: usize) -> crate::Result<TermDictionary> {
let buffer: Vec<u8> = {
let mut terms = vec![];
for _i in 0..num_terms {
let rand_string: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(thread_rng().gen_range(30..42))
.map(char::from)
.collect();
terms.push(rand_string);
}
terms.sort();

let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0..num_terms {
term_dictionary_builder.insert(terms[i].as_bytes(), &make_term_info(i as u64))?;
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
TermDictionary::open(file)
}

#[bench]
fn bench_termmerger(b: &mut Bencher) -> crate::Result<()> {
let dict1 = rand_dict(100_000)?;
let dict2 = rand_dict(100_000)?;
b.iter(|| -> crate::Result<u32> {
let stream1 = dict1.stream()?;
let stream2 = dict2.stream()?;
let mut merger = TermMerger::new(vec![stream1, stream2]);
let mut count = 0;
while merger.advance() {
count += 1;
}
Ok(count)
});
Ok(())
}
}