diff --git a/fastfield_codecs/src/column.rs b/fastfield_codecs/src/column.rs index eb73be5423..b40f76b692 100644 --- a/fastfield_codecs/src/column.rs +++ b/fastfield_codecs/src/column.rs @@ -1,6 +1,5 @@ use std::marker::PhantomData; use std::ops::RangeInclusive; -use std::sync::Mutex; use tantivy_bitpacker::minmax; @@ -103,7 +102,7 @@ impl<'a, T: Copy + PartialOrd + Send + Sync> Column for VecColumn<'a, T> { self.values[position as usize] } - fn iter<'b>(&'b self) -> Box + 'b> { + fn iter(&self) -> Box + '_> { Box::new(self.values.iter().copied()) } @@ -190,7 +189,7 @@ where self.from_column.num_vals() } - fn iter<'a>(&'a self) -> Box + 'a> { + fn iter(&self) -> Box + '_> { Box::new(self.from_column.iter().map(&self.monotonic_mapping)) } @@ -198,29 +197,6 @@ where // and we do not have any specialized implementation anyway. } -pub struct RemappedColumn { - column: C, - new_to_old_id_mapping: M, - min_max_cache: Mutex>, -} - -impl RemappedColumn -where - C: Column, - M: Column, - T: Copy + Ord + Default + Send + Sync, -{ - fn min_max(&self) -> (T, T) { - if let Some((min, max)) = *self.min_max_cache.lock().unwrap() { - return (min, max); - } - let (min, max) = - tantivy_bitpacker::minmax(self.iter()).unwrap_or((T::default(), T::default())); - *self.min_max_cache.lock().unwrap() = Some((min, max)); - (min, max) - } -} - pub struct IterColumn(T); impl From for IterColumn @@ -252,35 +228,11 @@ where self.0.len() as u64 } - fn iter<'a>(&'a self) -> Box + 'a> { + fn iter(&self) -> Box + '_> { Box::new(self.0.clone()) } } -impl Column for RemappedColumn -where - C: Column, - M: Column, - T: Copy + Ord + Default + Send + Sync, -{ - fn get_val(&self, idx: u64) -> T { - let old_id = self.new_to_old_id_mapping.get_val(idx); - self.column.get_val(old_id as u64) - } - - fn min_value(&self) -> T { - self.min_max().0 - } - - fn max_value(&self) -> T { - self.min_max().1 - } - - fn num_vals(&self) -> u64 { - self.new_to_old_id_mapping.num_vals() as u64 - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/fastfield_codecs/src/compact_space/mod.rs b/fastfield_codecs/src/compact_space/mod.rs index d137b30f4b..389bccf6e7 100644 --- a/fastfield_codecs/src/compact_space/mod.rs +++ b/fastfield_codecs/src/compact_space/mod.rs @@ -301,7 +301,7 @@ impl Column for CompactSpaceDecompressor { } #[inline] - fn iter<'a>(&'a self) -> Box + 'a> { + fn iter(&self) -> Box + '_> { Box::new(self.iter()) } fn get_between_vals(&self, range: RangeInclusive) -> Vec { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 7b25f3f340..210584e8cb 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,4 +1,3 @@ -use std::cmp; use std::collections::HashMap; use std::io::Write; use std::sync::Arc; @@ -15,6 +14,8 @@ use crate::fastfield::{ }; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping}; +use crate::indexer::sorted_doc_id_column::SortedDocIdColumn; +use crate::indexer::sorted_doc_id_multivalue_column::SortedDocIdMultiValueColumn; use crate::indexer::SegmentSerializer; use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings}; use crate::schema::{Cardinality, Field, FieldType, Schema}; @@ -87,28 +88,6 @@ pub struct IndexMerger { max_doc: u32, } -fn compute_min_max_val( - u64_reader: &dyn Column, - segment_reader: &SegmentReader, -) -> Option<(u64, u64)> { - if segment_reader.max_doc() == 0 { - return None; - } - - if segment_reader.alive_bitset().is_none() { - // no deleted documents, - // we can use the previous min_val, max_val. - return Some((u64_reader.min_value(), u64_reader.max_value())); - } - // some deleted documents, - // we need to recompute the max / min - segment_reader - .doc_ids_alive() - .map(|doc_id| u64_reader.get_val(doc_id as u64)) - .minmax() - .into_option() -} - struct TermOrdinalMapping { per_segment_new_term_ordinals: Vec>, } @@ -331,81 +310,8 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - let (min_value, max_value) = self - .readers - .iter() - .filter_map(|reader| { - let u64_reader: Arc> = - reader.fast_fields().typed_fast_field_reader(field).expect( - "Failed to find a reader for single fast field. This is a tantivy bug and \ - it should never happen.", - ); - compute_min_max_val(&*u64_reader, reader) - }) - .reduce(|a, b| (a.0.min(b.0), a.1.max(b.1))) - .expect("Unexpected error, empty readers in IndexMerger"); - - let fast_field_readers = self - .readers - .iter() - .map(|reader| { - let u64_reader: Arc> = - reader.fast_fields().typed_fast_field_reader(field).expect( - "Failed to find a reader for single fast field. This is a tantivy bug and \ - it should never happen.", - ); - u64_reader - }) - .collect::>(); - - #[derive(Clone)] - struct SortedDocIdFieldAccessProvider<'a> { - doc_id_mapping: &'a SegmentDocIdMapping, - fast_field_readers: &'a Vec>>, - min_value: u64, - max_value: u64, - num_vals: u64, - } - impl<'a> Column for SortedDocIdFieldAccessProvider<'a> { - fn get_val(&self, doc: u64) -> u64 { - let DocAddress { - doc_id, - segment_ord, - } = self.doc_id_mapping.get_old_doc_addr(doc as u32); - self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64) - } - - fn iter(&self) -> Box + '_> { - Box::new( - self.doc_id_mapping - .iter_old_doc_addrs() - .map(|old_doc_addr| { - let fast_field_reader = - &self.fast_field_readers[old_doc_addr.segment_ord as usize]; - fast_field_reader.get_val(old_doc_addr.doc_id as u64) - }), - ) - } - fn min_value(&self) -> u64 { - self.min_value - } - - fn max_value(&self) -> u64 { - self.max_value - } - - fn num_vals(&self) -> u64 { - self.num_vals - } - } - let fastfield_accessor = SortedDocIdFieldAccessProvider { - doc_id_mapping, - fast_field_readers: &fast_field_readers, - min_value, - max_value, - num_vals: doc_id_mapping.len() as u64, - }; - fast_field_serializer.create_auto_detect_u64_fast_field(field, fastfield_accessor)?; + let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field); + fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?; Ok(()) } @@ -663,113 +569,8 @@ impl IndexMerger { let offsets = self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?; - let mut min_value = u64::MAX; - let mut max_value = u64::MIN; - let mut num_vals = 0; - - let mut vals = Vec::with_capacity(100); - - let mut ff_readers = Vec::new(); - - // Our values are bitpacked and we need to know what should be - // our bitwidth and our minimum value before serializing any values. - // - // Computing those is non-trivial if some documents are deleted. - // We go through a complete first pass to compute the minimum and the - // maximum value and initialize our Serializer. - for reader in &self.readers { - let ff_reader: MultiValuedFastFieldReader = reader - .fast_fields() - .typed_fast_field_multi_reader::(field) - .expect( - "Failed to find multivalued fast field reader. This is a bug in tantivy. \ - Please report.", - ); - for doc in reader.doc_ids_alive() { - ff_reader.get_vals(doc, &mut vals); - for &val in &vals { - min_value = cmp::min(val, min_value); - max_value = cmp::max(val, max_value); - } - num_vals += vals.len(); - } - ff_readers.push(ff_reader); - // TODO optimize when no deletes - } - - if min_value > max_value { - min_value = 0; - max_value = 0; - } - - // We can now initialize our serializer, and push it the different values - struct SortedDocIdMultiValueAccessProvider<'a> { - doc_id_mapping: &'a SegmentDocIdMapping, - fast_field_readers: &'a Vec>, - offsets: Vec, - min_value: u64, - max_value: u64, - num_vals: u64, - } - impl<'a> Column for SortedDocIdMultiValueAccessProvider<'a> { - fn get_val(&self, pos: u64) -> u64 { - // use the offsets index to find the doc_id which will contain the position. - // the offsets are strictly increasing so we can do a simple search on it. - let new_doc_id: DocId = - self.offsets - .iter() - .position(|&offset| offset > pos) - .expect("pos is out of bounds") as DocId - - 1u32; - - // now we need to find the position of `pos` in the multivalued bucket - let num_pos_covered_until_now = self.offsets[new_doc_id as usize]; - let pos_in_values = pos - num_pos_covered_until_now; - - let old_doc_addr = self.doc_id_mapping.get_old_doc_addr(new_doc_id); - let num_vals = self.fast_field_readers[old_doc_addr.segment_ord as usize] - .get_len(old_doc_addr.doc_id); - assert!(num_vals >= pos_in_values); - let mut vals = Vec::new(); - self.fast_field_readers[old_doc_addr.segment_ord as usize] - .get_vals(old_doc_addr.doc_id, &mut vals); - - vals[pos_in_values as usize] - } - - fn iter(&self) -> Box + '_> { - Box::new( - self.doc_id_mapping - .iter_old_doc_addrs() - .flat_map(|old_doc_addr| { - let ff_reader = - &self.fast_field_readers[old_doc_addr.segment_ord as usize]; - let mut vals = Vec::new(); - ff_reader.get_vals(old_doc_addr.doc_id, &mut vals); - vals.into_iter() - }), - ) - } - fn min_value(&self) -> u64 { - self.min_value - } - - fn max_value(&self) -> u64 { - self.max_value - } - - fn num_vals(&self) -> u64 { - self.num_vals - } - } - let fastfield_accessor = SortedDocIdMultiValueAccessProvider { - doc_id_mapping, - fast_field_readers: &ff_readers, - offsets, - min_value, - max_value, - num_vals: num_vals as u64, - }; + let fastfield_accessor = + SortedDocIdMultiValueColumn::new(&self.readers, doc_id_mapping, &offsets, field); fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( field, fastfield_accessor, diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 16766025eb..fa4db5e660 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -19,6 +19,8 @@ mod segment_register; pub mod segment_serializer; pub mod segment_updater; mod segment_writer; +mod sorted_doc_id_column; +mod sorted_doc_id_multivalue_column; mod stamper; use crossbeam_channel as channel; diff --git a/src/indexer/sorted_doc_id_column.rs b/src/indexer/sorted_doc_id_column.rs new file mode 100644 index 0000000000..1f84c20ace --- /dev/null +++ b/src/indexer/sorted_doc_id_column.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; + +use fastfield_codecs::Column; +use itertools::Itertools; + +use crate::indexer::doc_id_mapping::SegmentDocIdMapping; +use crate::schema::Field; +use crate::{DocAddress, SegmentReader}; + +pub(crate) struct SortedDocIdColumn<'a> { + doc_id_mapping: &'a SegmentDocIdMapping, + fast_field_readers: Vec>>, + min_value: u64, + max_value: u64, + num_vals: u64, +} + +fn compute_min_max_val( + u64_reader: &dyn Column, + segment_reader: &SegmentReader, +) -> Option<(u64, u64)> { + if segment_reader.max_doc() == 0 { + return None; + } + + if segment_reader.alive_bitset().is_none() { + // no deleted documents, + // we can use the previous min_val, max_val. + return Some((u64_reader.min_value(), u64_reader.max_value())); + } + // some deleted documents, + // we need to recompute the max / min + segment_reader + .doc_ids_alive() + .map(|doc_id| u64_reader.get_val(doc_id as u64)) + .minmax() + .into_option() +} + +impl<'a> SortedDocIdColumn<'a> { + pub(crate) fn new( + readers: &'a [SegmentReader], + doc_id_mapping: &'a SegmentDocIdMapping, + field: Field, + ) -> Self { + let (min_value, max_value) = readers + .iter() + .filter_map(|reader| { + let u64_reader: Arc> = + reader.fast_fields().typed_fast_field_reader(field).expect( + "Failed to find a reader for single fast field. This is a tantivy bug and \ + it should never happen.", + ); + compute_min_max_val(&*u64_reader, reader) + }) + .reduce(|a, b| (a.0.min(b.0), a.1.max(b.1))) + .expect("Unexpected error, empty readers in IndexMerger"); + + let fast_field_readers = readers + .iter() + .map(|reader| { + let u64_reader: Arc> = + reader.fast_fields().typed_fast_field_reader(field).expect( + "Failed to find a reader for single fast field. This is a tantivy bug and \ + it should never happen.", + ); + u64_reader + }) + .collect::>(); + + SortedDocIdColumn { + doc_id_mapping, + fast_field_readers, + min_value, + max_value, + num_vals: doc_id_mapping.len() as u64, + } + } +} + +impl<'a> Column for SortedDocIdColumn<'a> { + fn get_val(&self, doc: u64) -> u64 { + let DocAddress { + doc_id, + segment_ord, + } = self.doc_id_mapping.get_old_doc_addr(doc as u32); + self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64) + } + + fn iter(&self) -> Box + '_> { + Box::new( + self.doc_id_mapping + .iter_old_doc_addrs() + .map(|old_doc_addr| { + let fast_field_reader = + &self.fast_field_readers[old_doc_addr.segment_ord as usize]; + fast_field_reader.get_val(old_doc_addr.doc_id as u64) + }), + ) + } + fn min_value(&self) -> u64 { + self.min_value + } + + fn max_value(&self) -> u64 { + self.max_value + } + + fn num_vals(&self) -> u64 { + self.num_vals + } +} diff --git a/src/indexer/sorted_doc_id_multivalue_column.rs b/src/indexer/sorted_doc_id_multivalue_column.rs new file mode 100644 index 0000000000..bcd86baad1 --- /dev/null +++ b/src/indexer/sorted_doc_id_multivalue_column.rs @@ -0,0 +1,121 @@ +use std::cmp; + +use fastfield_codecs::Column; + +use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader}; +use crate::indexer::doc_id_mapping::SegmentDocIdMapping; +use crate::schema::Field; +use crate::{DocId, SegmentReader}; + +// We can now initialize our serializer, and push it the different values +pub(crate) struct SortedDocIdMultiValueColumn<'a> { + doc_id_mapping: &'a SegmentDocIdMapping, + fast_field_readers: Vec>, + offsets: &'a [u64], + min_value: u64, + max_value: u64, + num_vals: u64, +} + +impl<'a> SortedDocIdMultiValueColumn<'a> { + pub(crate) fn new( + readers: &'a [SegmentReader], + doc_id_mapping: &'a SegmentDocIdMapping, + offsets: &'a [u64], + field: Field, + ) -> Self { + // Our values are bitpacked and we need to know what should be + // our bitwidth and our minimum value before serializing any values. + // + // Computing those is non-trivial if some documents are deleted. + // We go through a complete first pass to compute the minimum and the + // maximum value and initialize our Serializer. + let mut num_vals = 0; + let mut min_value = u64::MAX; + let mut max_value = u64::MIN; + let mut vals = Vec::new(); + let mut fast_field_readers = Vec::with_capacity(readers.len()); + for reader in readers { + let ff_reader: MultiValuedFastFieldReader = reader + .fast_fields() + .typed_fast_field_multi_reader::(field) + .expect( + "Failed to find multivalued fast field reader. This is a bug in tantivy. \ + Please report.", + ); + for doc in reader.doc_ids_alive() { + ff_reader.get_vals(doc, &mut vals); + for &val in &vals { + min_value = cmp::min(val, min_value); + max_value = cmp::max(val, max_value); + } + num_vals += vals.len(); + } + fast_field_readers.push(ff_reader); + // TODO optimize when no deletes + } + if min_value > max_value { + min_value = 0; + max_value = 0; + } + SortedDocIdMultiValueColumn { + doc_id_mapping, + fast_field_readers, + offsets, + min_value, + max_value, + num_vals: num_vals as u64, + } + } +} + +impl<'a> Column for SortedDocIdMultiValueColumn<'a> { + fn get_val(&self, pos: u64) -> u64 { + // use the offsets index to find the doc_id which will contain the position. + // the offsets are strictly increasing so we can do a simple search on it. + let new_doc_id: DocId = self + .offsets + .iter() + .position(|&offset| offset > pos) + .expect("pos is out of bounds") as DocId + - 1u32; + + // now we need to find the position of `pos` in the multivalued bucket + let num_pos_covered_until_now = self.offsets[new_doc_id as usize]; + let pos_in_values = pos - num_pos_covered_until_now; + + let old_doc_addr = self.doc_id_mapping.get_old_doc_addr(new_doc_id); + let num_vals = + self.fast_field_readers[old_doc_addr.segment_ord as usize].get_len(old_doc_addr.doc_id); + assert!(num_vals >= pos_in_values); + let mut vals = Vec::new(); + self.fast_field_readers[old_doc_addr.segment_ord as usize] + .get_vals(old_doc_addr.doc_id, &mut vals); + + vals[pos_in_values as usize] + } + + fn iter(&self) -> Box + '_> { + Box::new( + self.doc_id_mapping + .iter_old_doc_addrs() + .flat_map(|old_doc_addr| { + let ff_reader = &self.fast_field_readers[old_doc_addr.segment_ord as usize]; + let mut vals = Vec::new(); + ff_reader.get_vals(old_doc_addr.doc_id, &mut vals); + vals.into_iter() + }), + ) + } + fn min_value(&self) -> u64 { + self.min_value + } + + fn max_value(&self) -> u64 { + self.max_value + } + + fn num_vals(&self) -> u64 { + self.num_vals + } +} diff --git a/src/termdict/sstable_termdict/sstable/merge/mod.rs b/src/termdict/sstable_termdict/sstable/merge/mod.rs index d604409628..3170500b4e 100644 --- a/src/termdict/sstable_termdict/sstable/merge/mod.rs +++ b/src/termdict/sstable_termdict/sstable/merge/mod.rs @@ -83,7 +83,6 @@ mod tests { } assert!(sstable_writer.finalize().is_ok()); } - dbg!(&buffer); buffer }