diff --git a/src/positions/mod.rs b/src/positions/mod.rs index 7ac62d075b..7af2b2ec51 100644 --- a/src/positions/mod.rs +++ b/src/positions/mod.rs @@ -39,7 +39,7 @@ pub mod tests { use super::{PositionReader, PositionSerializer}; use crate::directory::ReadOnlySource; - use crate::positions::COMPRESSION_BLOCK_SIZE; + use crate::positions::reader::PositionReaderAbsolute; use std::iter; fn create_stream_buffer(vals: &[u32]) -> (ReadOnlySource, ReadOnlySource) { @@ -76,19 +76,35 @@ pub mod tests { } #[test] - fn test_position_skip() { - let v: Vec = (0..1_000).collect(); + fn test_position_absolute_read() { + let v: Vec = (0..1000).collect(); let (stream, skip) = create_stream_buffer(&v[..]); assert_eq!(skip.len(), 12); assert_eq!(stream.len(), 1168); - - let mut position_reader = PositionReader::new(stream, skip, 0u64); - position_reader.skip(10); - for &n in &[10, 127, COMPRESSION_BLOCK_SIZE, 130, 312] { + let mut position_reader = PositionReaderAbsolute::new(stream, skip, 0u64); + for &n in &[1, 10, 127, 128, 130, 312] { let mut v = vec![0u32; n]; - position_reader.read(&mut v[..n]); + position_reader.read(0, &mut v[..]); for i in 0..n { - assert_eq!(v[i], 10u32 + i as u32); + assert_eq!(v[i], i as u32); + } + } + } + + #[test] + fn test_position_absolute_read_with_offset() { + let v: Vec = (0..1000).collect(); + let (stream, skip) = create_stream_buffer(&v[..]); + assert_eq!(skip.len(), 12); + assert_eq!(stream.len(), 1168); + let mut position_reader = PositionReaderAbsolute::new(stream, skip, 0u64); + for &offset in &[1u64, 10u64, 127u64, 128u64, 130u64, 312u64] { + for &len in &[1, 10, 130, 500] { + let mut v = vec![0u32; len]; + position_reader.read(offset, &mut v[..]); + for i in 0..len { + assert_eq!(v[i], i as u32 + offset as u32); + } } } } @@ -115,6 +131,40 @@ pub mod tests { } } + + + #[test] + fn test_position_absolute_reread_anchor_different_than_block() { + let v: Vec = (0..2_000_000).collect(); + let (stream, skip) = create_stream_buffer(&v[..]); + assert_eq!(skip.len(), 15_749); + assert_eq!(stream.len(), 4_987_872); + let mut position_reader = + PositionReaderAbsolute::new(stream.clone(), skip.clone(), 0); + let mut buf = [0u32; 256]; + position_reader.read(128, &mut buf); + for i in 0..256 { + assert_eq!(buf[i], (128 + i) as u32); + } + position_reader.read(128, &mut buf); + for i in 0..256 { + assert_eq!(buf[i], (128 + i) as u32); + } + } + + #[test] + #[should_panic(expected = "offset arguments should be increasing.")] + fn test_position_absolute_panic_if_called_previous_anchor() { + let v: Vec = (0..2_000_000).collect(); + let (stream, skip) = create_stream_buffer(&v[..]); + assert_eq!(skip.len(), 15_749); + assert_eq!(stream.len(), 4_987_872); + let mut buf = [0u32; 1]; + let mut position_reader = + PositionReaderAbsolute::new(stream.clone(), skip.clone(), 200_000); + position_reader.read(10, &mut buf); + } + #[test] fn test_position_long_skip_const() { const CONST_VAL: u32 = 9u32; diff --git a/src/positions/reader.rs b/src/positions/reader.rs index 6737f4bbc7..c3764c308a 100644 --- a/src/positions/reader.rs +++ b/src/positions/reader.rs @@ -60,6 +60,23 @@ impl Positions { u64::deserialize(&mut long_skip_blocks).expect("Index corrupted") } + fn reader_absolute(&self, offset: u64) -> PositionReaderAbsolute { + let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize; + let offset_num_bytes: u64 = self.long_skip(long_skip_id); + let mut position_read = OwnedRead::new(self.position_source.clone()); + position_read.advance(offset_num_bytes as usize); + let mut skip_read = OwnedRead::new(self.skip_source.clone()); + skip_read.advance(long_skip_id * LONG_SKIP_IN_BLOCKS); + PositionReaderAbsolute { + bit_packer: self.bit_packer, + skip_read, + position_read, + buffer: Box::new([0u32; 128]), + block_offset: std::i64::MAX as u64, + anchor_offset: (long_skip_id as u64) * LONG_SKIP_INTERVAL, + } + } + fn reader(&self, offset: u64) -> PositionReader { let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize; let small_skip = (offset % LONG_SKIP_INTERVAL) as usize; @@ -81,12 +98,84 @@ impl Positions { } } +pub struct PositionReaderAbsolute { + skip_read: OwnedRead, + position_read: OwnedRead, + bit_packer: BitPacker4x, + buffer: Box<[u32; COMPRESSION_BLOCK_SIZE]>, + + block_offset: u64, + anchor_offset: u64, +} + +impl PositionReaderAbsolute { + pub fn new( + position_source: ReadOnlySource, + skip_source: ReadOnlySource, + offset: u64, + ) -> PositionReaderAbsolute { + Positions::new(position_source, skip_source).reader_absolute(offset) + } + + /// Fills a buffer with the positions `[offset..offset+output.len())` integers. + /// + /// `offset` is required to have a value >= to the offsets given in previous calls + /// for the given `PositionReaderAbsolute` instance. + pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) { + assert!( + offset >= self.anchor_offset, + "offset arguments should be increasing." + ); + let delta_to_block_offset = offset as i64 - self.block_offset as i64; + if delta_to_block_offset < 0 || delta_to_block_offset >= 128 { + // The first position is not within the first block. + // We need to decompress the first block. + let delta_to_anchor_offset = offset - self.anchor_offset; + let num_blocks_to_skip = + (delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize; + let num_bits: usize = self.skip_read.as_ref()[..num_blocks_to_skip] + .iter() + .cloned() + .map(|num_bits| num_bits as usize) + .sum(); + let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8; + self.skip_read.advance(num_blocks_to_skip as usize); + self.position_read.advance(num_bytes_to_skip); + self.anchor_offset = offset - (offset % COMPRESSION_BLOCK_SIZE as u64); + self.block_offset = self.anchor_offset; + let num_bits = self.skip_read.get(0); + self.bit_packer + .decompress(self.position_read.as_ref(), self.buffer.as_mut(), num_bits); + } + + let mut num_bits = self.skip_read.get(0); + let mut position_data = self.position_read.as_ref(); + + for i in 1.. { + let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE; + let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block; + if remaining_in_block >= output.len() { + output.copy_from_slice(&self.buffer[offset_in_block..][..output.len()]); + return; + } + output[..remaining_in_block].copy_from_slice(&self.buffer[offset_in_block..]); + output = &mut output[remaining_in_block..]; + offset += remaining_in_block as u64; + position_data = &position_data[(num_bits as usize * COMPRESSION_BLOCK_SIZE / 8)..]; + num_bits = self.skip_read.get(i); + self.bit_packer + .decompress(position_data, self.buffer.as_mut(), num_bits); + self.block_offset += COMPRESSION_BLOCK_SIZE as u64; + } + } +} + pub struct PositionReader { skip_read: OwnedRead, position_read: OwnedRead, bit_packer: BitPacker4x, inner_offset: usize, - buffer: Box<[u32; 128]>, + buffer: Box<[u32; COMPRESSION_BLOCK_SIZE]>, ahead: Option, // if None, no block is loaded. // if Some(num_blocks), the block currently loaded is num_blocks ahead // of the block of the next int to read. @@ -96,14 +185,14 @@ pub struct PositionReader { // compared to the cursor of the actual stream. // // By contract, when this function is called, the current block has to be -// decompressed. +// already decompressed. // // If the requested number of els ends exactly at a given block, the next // block is not decompressed. fn read_impl( bit_packer: BitPacker4x, mut position: &[u8], - buffer: &mut [u32; 128], + buffer: &mut [u32; COMPRESSION_BLOCK_SIZE], mut inner_offset: usize, num_bits: &[u8], output: &mut [u32], diff --git a/src/postings/block_segment_postings.rs b/src/postings/block_segment_postings.rs index 4e19ab6b5c..0e4040632f 100644 --- a/src/postings/block_segment_postings.rs +++ b/src/postings/block_segment_postings.rs @@ -1,9 +1,9 @@ use crate::common::{BinarySerializable, VInt}; use crate::directory::ReadOnlySource; use crate::postings::compression::{ - AlignedBuffer, BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE + AlignedBuffer, BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE, }; -use crate::postings::{FreqReadingOption, SkipReader, BlockInfo}; +use crate::postings::{BlockInfo, FreqReadingOption, SkipReader}; use crate::schema::IndexRecordOption; use crate::{DocId, TERMINATED}; @@ -90,7 +90,7 @@ impl BlockSegmentPostings { let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, data); let skip_reader = match skip_data_opt { - Some(skip_data) => SkipReader::new(skip_data, doc_freq, record_option), + Some(skip_data) => SkipReader::new(skip_data, doc_freq, record_option), None => SkipReader::new(ReadOnlySource::empty(), doc_freq, record_option), }; @@ -123,7 +123,7 @@ impl BlockSegmentPostings { if let Some(skip_data) = skip_data_opt { self.skip_reader.reset(skip_data, doc_freq); } else { - self.skip_reader.reset(ReadOnlySource::empty(), doc_freq); + self.skip_reader.reset(ReadOnlySource::empty(), doc_freq); } self.doc_freq = doc_freq as usize; } @@ -214,13 +214,12 @@ impl BlockSegmentPostings { self.doc_decoder.clear(); - let num_vint_docs = - if let BlockInfo::VInt(num_vint_docs) = self.skip_reader.block_info() { - num_vint_docs - } else { - // TODO - unimplemented!() - }; + let num_vint_docs = if let BlockInfo::VInt(num_vint_docs) = self.skip_reader.block_info() { + num_vint_docs + } else { + // TODO + unimplemented!() + }; if num_vint_docs == 0 { return BlockSegmentPostingsSkipResult::Terminated; } @@ -239,26 +238,14 @@ impl BlockSegmentPostings { .unwrap_or(BlockSegmentPostingsSkipResult::Terminated) } - fn read_bitpacked_block(&mut self, doc_num_bits: u8, tf_num_bits: u8) { - let offset = self.skip_reader.byte_offset(); - decode_block( - &mut self.doc_decoder, - if let FreqReadingOption::ReadFreq = self.freq_reading_option { - Some(&mut self.freq_decoder) - } else { - None - }, - &self.data.as_slice()[offset..], - self.skip_reader.last_doc_in_previous_block, - doc_num_bits, - tf_num_bits, - ); - } - pub fn read_block(&mut self) { let offset = self.skip_reader.byte_offset(); match self.skip_reader.block_info() { - BlockInfo::BitPacked { doc_num_bits, tf_num_bits, .. } => { + BlockInfo::BitPacked { + doc_num_bits, + tf_num_bits, + .. + } => { decode_block( &mut self.doc_decoder, if let FreqReadingOption::ReadFreq = self.freq_reading_option { @@ -298,9 +285,9 @@ impl BlockSegmentPostings { } if let BlockInfo::VInt(num_vint_docs) = self.skip_reader.block_info() { - if num_vint_docs== 0 { - return false; - } + if num_vint_docs == 0 { + return false; + } } self.read_block(); true diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 142a00a620..ac0dbeb52e 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -24,7 +24,7 @@ pub(crate) use self::postings_writer::MultiFieldPostingsWriter; pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; pub use self::postings::Postings; -pub(crate) use self::skip::{SkipReader, BlockInfo}; +pub(crate) use self::skip::{BlockInfo, SkipReader}; pub use self::term_info::TermInfo; pub use self::block_segment_postings::{BlockSegmentPostings, BlockSegmentPostingsSkipResult}; diff --git a/src/postings/skip.rs b/src/postings/skip.rs index cfe5e7ac0c..2cf8097a31 100644 --- a/src/postings/skip.rs +++ b/src/postings/skip.rs @@ -57,7 +57,7 @@ pub(crate) struct SkipReader { skip_info: IndexRecordOption, byte_offset: usize, remaining_docs: u32, // number of docs remaining, including the - // documents in the current block. + // documents in the current block. block_info: BlockInfo, } @@ -66,9 +66,9 @@ pub(crate) enum BlockInfo { BitPacked { doc_num_bits: u8, tf_num_bits: u8, - tf_sum: u32 + tf_sum: u32, }, - VInt(u32) + VInt(u32), } impl Default for BlockInfo { @@ -79,7 +79,12 @@ impl Default for BlockInfo { impl BlockInfo { fn block_num_bytes(&self) -> usize { - if let BlockInfo::BitPacked { doc_num_bits, tf_num_bits, .. } = self { + if let BlockInfo::BitPacked { + doc_num_bits, + tf_num_bits, + .. + } = self + { (doc_num_bits + tf_num_bits) as usize * COMPRESSION_BLOCK_SIZE / 8 } else { 0 @@ -88,26 +93,22 @@ impl BlockInfo { fn num_docs(&self) -> u32 { match self { - BlockInfo::BitPacked { .. } => { - COMPRESSION_BLOCK_SIZE as u32 - } - BlockInfo::VInt(num_docs) => { - *num_docs - } + BlockInfo::BitPacked { .. } => COMPRESSION_BLOCK_SIZE as u32, + BlockInfo::VInt(num_docs) => *num_docs, } } } impl SkipReader { pub fn new(data: ReadOnlySource, doc_freq: u32, skip_info: IndexRecordOption) -> SkipReader { - SkipReader { + SkipReader { last_doc_in_block: 0u32, last_doc_in_previous_block: 0u32, owned_read: OwnedRead::new(data), skip_info, block_info: BlockInfo::default(), byte_offset: 0, - remaining_docs: doc_freq + remaining_docs: doc_freq, } } @@ -120,15 +121,15 @@ impl SkipReader { self.remaining_docs = doc_freq; } - pub fn doc(&self,) -> DocId { + pub fn doc(&self) -> DocId { self.last_doc_in_block } pub fn tf_sum(&self) -> u32 { // TODO - if let BlockInfo::BitPacked { tf_sum , ..} = self.block_info { + if let BlockInfo::BitPacked { tf_sum, .. } = self.block_info { tf_sum - } else { + } else { unimplemented!() } } @@ -147,7 +148,7 @@ impl SkipReader { self.block_info = BlockInfo::BitPacked { doc_num_bits, tf_num_bits: 0, - tf_sum: 0 + tf_sum: 0, }; } IndexRecordOption::WithFreqs => { @@ -155,19 +156,18 @@ impl SkipReader { self.block_info = BlockInfo::BitPacked { doc_num_bits, tf_num_bits, - tf_sum: 0 + tf_sum: 0, }; self.owned_read.advance(2); } IndexRecordOption::WithFreqsAndPositions => { let tf_num_bits = self.owned_read.get(1); self.owned_read.advance(2); - let tf_sum = - u32::deserialize(&mut self.owned_read).expect("Failed reading tf_sum"); + let tf_sum = u32::deserialize(&mut self.owned_read).expect("Failed reading tf_sum"); self.block_info = BlockInfo::BitPacked { doc_num_bits, tf_num_bits, - tf_sum + tf_sum, }; } } @@ -195,10 +195,10 @@ impl SkipReader { #[cfg(test)] mod tests { + use super::BlockInfo; use super::IndexRecordOption; use super::{SkipReader, SkipSerializer}; use crate::directory::ReadOnlySource; - use super::BlockInfo; use crate::postings::compression::COMPRESSION_BLOCK_SIZE; #[test] @@ -212,14 +212,31 @@ mod tests { skip_serializer.data().to_owned() }; let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32; - let mut skip_reader = - SkipReader::new(ReadOnlySource::new(buf), doc_freq, IndexRecordOption::WithFreqs); + let mut skip_reader = SkipReader::new( + ReadOnlySource::new(buf), + doc_freq, + IndexRecordOption::WithFreqs, + ); assert!(skip_reader.advance()); assert_eq!(skip_reader.doc(), 1u32); - assert_eq!(skip_reader.block_info(), BlockInfo::BitPacked {doc_num_bits: 2u8, tf_num_bits: 3u8, tf_sum: 0}); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 2u8, + tf_num_bits: 3u8, + tf_sum: 0 + } + ); assert!(skip_reader.advance()); assert_eq!(skip_reader.doc(), 5u32); - assert_eq!(skip_reader.block_info(), BlockInfo::BitPacked {doc_num_bits: 5u8, tf_num_bits: 2u8, tf_sum: 0}); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 5u8, + tf_num_bits: 2u8, + tf_sum: 0 + } + ); assert!(!skip_reader.advance()); } @@ -232,13 +249,31 @@ mod tests { skip_serializer.data().to_owned() }; let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32; - let mut skip_reader = SkipReader::new(ReadOnlySource::from(buf), doc_freq, IndexRecordOption::Basic); + let mut skip_reader = SkipReader::new( + ReadOnlySource::from(buf), + doc_freq, + IndexRecordOption::Basic, + ); assert!(skip_reader.advance()); assert_eq!(skip_reader.doc(), 1u32); - assert_eq!(skip_reader.block_info(), BlockInfo::BitPacked {doc_num_bits: 2u8, tf_num_bits: 0, tf_sum: 0u32}); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 2u8, + tf_num_bits: 0, + tf_sum: 0u32 + } + ); assert!(skip_reader.advance()); assert_eq!(skip_reader.doc(), 5u32); - assert_eq!(skip_reader.block_info(), BlockInfo::BitPacked {doc_num_bits: 5u8, tf_num_bits: 0, tf_sum: 0u32}); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 5u8, + tf_num_bits: 0, + tf_sum: 0u32 + } + ); assert!(!skip_reader.advance()); } }