Skip to content

Commit

Permalink
use union based on FST for merging the term dictionaries
Browse files Browse the repository at this point in the history
  • Loading branch information
scampi committed May 24, 2021
1 parent 1f19000 commit 27b29d6
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 75 deletions.
8 changes: 3 additions & 5 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,11 @@ impl IndexMerger {
let mut total_doc_freq = 0;

// Let's compute the list of non-empty posting lists
for heap_item in merged_terms.current_kvs() {
let segment_ord = heap_item.segment_ord;
let term_info = heap_item.streamer.value();
let segment_reader = &self.readers[heap_item.segment_ord];
for (segment_ord, term_info) in merged_terms.current_kvs() {
let segment_reader = &self.readers[segment_ord];
let inverted_index: &InvertedIndexReader = &*field_readers[segment_ord];
let segment_postings = inverted_index
.read_postings_from_terminfo(term_info, segment_postings_option)?;
.read_postings_from_terminfo(&term_info, segment_postings_option)?;
let delete_bitset_opt = segment_reader.delete_bitset();
let doc_freq = if let Some(delete_bitset) = delete_bitset_opt {
segment_postings.doc_freq_given_deletes(delete_bitset)
Expand Down
4 changes: 2 additions & 2 deletions src/termdict/fst_termdict/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ pub struct TermStreamer<'a, A = AlwaysMatch>
where
A: Automaton,
{
fst_map: &'a TermDictionary,
stream: Stream<'a, A>,
pub fst_map: &'a TermDictionary,
pub stream: Stream<'a, A>,
term_ord: TermOrdinal,
current_key: Vec<u8>,
current_value: TermInfo,
Expand Down
105 changes: 37 additions & 68 deletions src/termdict/merger.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,58 @@
use crate::postings::TermInfo;
use crate::termdict::TermDictionary;
use crate::termdict::TermOrdinal;
use crate::termdict::TermStreamer;
use std::cmp::Ordering;
use std::collections::BinaryHeap;

pub struct HeapItem<'a> {
pub streamer: TermStreamer<'a>,
pub segment_ord: usize,
}

impl<'a> PartialEq for HeapItem<'a> {
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}

impl<'a> Eq for HeapItem<'a> {}

impl<'a> PartialOrd for HeapItem<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<'a> Ord for HeapItem<'a> {
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
}
}
use tantivy_fst::map::OpBuilder;
use tantivy_fst::map::Union;
use tantivy_fst::raw::IndexedValue;
use tantivy_fst::Streamer;

/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
/// the term.
pub struct TermMerger<'a> {
heap: BinaryHeap<HeapItem<'a>>,
current_streamers: Vec<HeapItem<'a>>,
dictionaries: Vec<&'a TermDictionary>,
union: Union<'a>,
key: Vec<u8>,
matching_streams: Vec<IndexedValue>,
}

impl<'a> TermMerger<'a> {
/// Stream of merged term dictionary
///
pub fn new(streams: Vec<TermStreamer<'a>>) -> TermMerger<'a> {
let mut op_builder = OpBuilder::new();
let mut dictionaries = vec![];
for streamer in streams {
op_builder.push(streamer.stream);
dictionaries.push(streamer.fst_map);
}
TermMerger {
heap: BinaryHeap::new(),
current_streamers: streams
.into_iter()
.enumerate()
.map(|(ord, streamer)| HeapItem {
streamer,
segment_ord: ord,
})
.collect(),
dictionaries,
union: op_builder.union(),
key: vec![],
matching_streams: vec![],
}
}

pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
}

fn advance_segments(&mut self) {
let streamers = &mut self.current_streamers;
let heap = &mut self.heap;
for mut heap_item in streamers.drain(..) {
if heap_item.streamer.advance() {
heap.push(heap_item);
}
}
pub fn matching_segments<'b: 'a>(&'b self) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.matching_streams.iter().map(|iv| (iv.index, iv.value))
}

/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(head) = self.heap.pop() {
self.current_streamers.push(head);
while let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it);
}
if let Some((k, values)) = self.union.next() {
self.key.clear();
self.key.extend_from_slice(k);
self.matching_streams.clear();
self.matching_streams.extend_from_slice(values);
self.matching_streams.sort_by_key(|iv| iv.index);
true
} else {
false
Expand All @@ -101,7 +65,7 @@ impl<'a> TermMerger<'a> {
/// iff advance() has been called before
/// and "true" was returned.
pub fn key(&self) -> &[u8] {
self.current_streamers[0].streamer.key()
&self.key
}

/// Returns the sorted list of segment ordinals
Expand All @@ -110,8 +74,13 @@ impl<'a> TermMerger<'a> {
/// This method may be called
/// iff advance() has been called before
/// and "true" was returned.
pub fn current_kvs(&self) -> &[HeapItem<'a>] {
&self.current_streamers[..]
pub fn current_kvs<'b: 'a>(&'b self) -> impl 'b + Iterator<Item = (usize, TermInfo)> {
self.matching_streams.iter().map(move |iv| {
(
iv.index,
self.dictionaries[iv.index].term_info_from_ord(iv.value),
)
})
}
}

Expand Down

0 comments on commit 27b29d6

Please sign in to comment.