Skip to content

Commit

Permalink
broken unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed May 22, 2020
1 parent e033734 commit d207de5
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 72 deletions.
68 changes: 59 additions & 9 deletions src/positions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod tests {

use super::{PositionReader, PositionSerializer};
use crate::directory::ReadOnlySource;
use crate::positions::COMPRESSION_BLOCK_SIZE;
use crate::positions::reader::PositionReaderAbsolute;
use std::iter;

fn create_stream_buffer(vals: &[u32]) -> (ReadOnlySource, ReadOnlySource) {
Expand Down Expand Up @@ -76,19 +76,35 @@ pub mod tests {
}

#[test]
fn test_position_skip() {
let v: Vec<u32> = (0..1_000).collect();
fn test_position_absolute_read() {
let v: Vec<u32> = (0..1000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 12);
assert_eq!(stream.len(), 1168);

let mut position_reader = PositionReader::new(stream, skip, 0u64);
position_reader.skip(10);
for &n in &[10, 127, COMPRESSION_BLOCK_SIZE, 130, 312] {
let mut position_reader = PositionReaderAbsolute::new(stream, skip, 0u64);
for &n in &[1, 10, 127, 128, 130, 312] {
let mut v = vec![0u32; n];
position_reader.read(&mut v[..n]);
position_reader.read(0, &mut v[..]);
for i in 0..n {
assert_eq!(v[i], 10u32 + i as u32);
assert_eq!(v[i], i as u32);
}
}
}

#[test]
fn test_position_absolute_read_with_offset() {
let v: Vec<u32> = (0..1000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 12);
assert_eq!(stream.len(), 1168);
let mut position_reader = PositionReaderAbsolute::new(stream, skip, 0u64);
for &offset in &[1u64, 10u64, 127u64, 128u64, 130u64, 312u64] {
for &len in &[1, 10, 130, 500] {
let mut v = vec![0u32; len];
position_reader.read(offset, &mut v[..]);
for i in 0..len {
assert_eq!(v[i], i as u32 + offset as u32);
}
}
}
}
Expand All @@ -115,6 +131,40 @@ pub mod tests {
}
}



#[test]
fn test_position_absolute_reread_anchor_different_than_block() {
let v: Vec<u32> = (0..2_000_000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 15_749);
assert_eq!(stream.len(), 4_987_872);
let mut position_reader =
PositionReaderAbsolute::new(stream.clone(), skip.clone(), 0);
let mut buf = [0u32; 256];
position_reader.read(128, &mut buf);
for i in 0..256 {
assert_eq!(buf[i], (128 + i) as u32);
}
position_reader.read(128, &mut buf);
for i in 0..256 {
assert_eq!(buf[i], (128 + i) as u32);
}
}

#[test]
#[should_panic(expected = "offset arguments should be increasing.")]
fn test_position_absolute_panic_if_called_previous_anchor() {
let v: Vec<u32> = (0..2_000_000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 15_749);
assert_eq!(stream.len(), 4_987_872);
let mut buf = [0u32; 1];
let mut position_reader =
PositionReaderAbsolute::new(stream.clone(), skip.clone(), 200_000);
position_reader.read(10, &mut buf);
}

#[test]
fn test_position_long_skip_const() {
const CONST_VAL: u32 = 9u32;
Expand Down
95 changes: 92 additions & 3 deletions src/positions/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ impl Positions {
u64::deserialize(&mut long_skip_blocks).expect("Index corrupted")
}

fn reader_absolute(&self, offset: u64) -> PositionReaderAbsolute {
let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize;
let offset_num_bytes: u64 = self.long_skip(long_skip_id);
let mut position_read = OwnedRead::new(self.position_source.clone());
position_read.advance(offset_num_bytes as usize);
let mut skip_read = OwnedRead::new(self.skip_source.clone());
skip_read.advance(long_skip_id * LONG_SKIP_IN_BLOCKS);
PositionReaderAbsolute {
bit_packer: self.bit_packer,
skip_read,
position_read,
buffer: Box::new([0u32; 128]),
block_offset: std::i64::MAX as u64,
anchor_offset: (long_skip_id as u64) * LONG_SKIP_INTERVAL,
}
}

fn reader(&self, offset: u64) -> PositionReader {
let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize;
let small_skip = (offset % LONG_SKIP_INTERVAL) as usize;
Expand All @@ -81,12 +98,84 @@ impl Positions {
}
}

pub struct PositionReaderAbsolute {
skip_read: OwnedRead,
position_read: OwnedRead,
bit_packer: BitPacker4x,
buffer: Box<[u32; COMPRESSION_BLOCK_SIZE]>,

block_offset: u64,
anchor_offset: u64,
}

impl PositionReaderAbsolute {
pub fn new(
position_source: ReadOnlySource,
skip_source: ReadOnlySource,
offset: u64,
) -> PositionReaderAbsolute {
Positions::new(position_source, skip_source).reader_absolute(offset)
}

/// Fills a buffer with the positions `[offset..offset+output.len())` integers.
///
/// `offset` is required to have a value >= to the offsets given in previous calls
/// for the given `PositionReaderAbsolute` instance.
pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) {
assert!(
offset >= self.anchor_offset,
"offset arguments should be increasing."
);
let delta_to_block_offset = offset as i64 - self.block_offset as i64;
if delta_to_block_offset < 0 || delta_to_block_offset >= 128 {
// The first position is not within the first block.
// We need to decompress the first block.
let delta_to_anchor_offset = offset - self.anchor_offset;
let num_blocks_to_skip =
(delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize;
let num_bits: usize = self.skip_read.as_ref()[..num_blocks_to_skip]
.iter()
.cloned()
.map(|num_bits| num_bits as usize)
.sum();
let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8;
self.skip_read.advance(num_blocks_to_skip as usize);
self.position_read.advance(num_bytes_to_skip);
self.anchor_offset = offset - (offset % COMPRESSION_BLOCK_SIZE as u64);
self.block_offset = self.anchor_offset;
let num_bits = self.skip_read.get(0);
self.bit_packer
.decompress(self.position_read.as_ref(), self.buffer.as_mut(), num_bits);
}

let mut num_bits = self.skip_read.get(0);
let mut position_data = self.position_read.as_ref();

for i in 1.. {
let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE;
let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block;
if remaining_in_block >= output.len() {
output.copy_from_slice(&self.buffer[offset_in_block..][..output.len()]);
return;
}
output[..remaining_in_block].copy_from_slice(&self.buffer[offset_in_block..]);
output = &mut output[remaining_in_block..];
offset += remaining_in_block as u64;
position_data = &position_data[(num_bits as usize * COMPRESSION_BLOCK_SIZE / 8)..];
num_bits = self.skip_read.get(i);
self.bit_packer
.decompress(position_data, self.buffer.as_mut(), num_bits);
self.block_offset += COMPRESSION_BLOCK_SIZE as u64;
}
}
}

pub struct PositionReader {
skip_read: OwnedRead,
position_read: OwnedRead,
bit_packer: BitPacker4x,
inner_offset: usize,
buffer: Box<[u32; 128]>,
buffer: Box<[u32; COMPRESSION_BLOCK_SIZE]>,
ahead: Option<usize>, // if None, no block is loaded.
// if Some(num_blocks), the block currently loaded is num_blocks ahead
// of the block of the next int to read.
Expand All @@ -96,14 +185,14 @@ pub struct PositionReader {
// compared to the cursor of the actual stream.
//
// By contract, when this function is called, the current block has to be
// decompressed.
// already decompressed.
//
// If the requested number of els ends exactly at a given block, the next
// block is not decompressed.
fn read_impl(
bit_packer: BitPacker4x,
mut position: &[u8],
buffer: &mut [u32; 128],
buffer: &mut [u32; COMPRESSION_BLOCK_SIZE],
mut inner_offset: usize,
num_bits: &[u8],
output: &mut [u32],
Expand Down
49 changes: 18 additions & 31 deletions src/postings/block_segment_postings.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::common::{BinarySerializable, VInt};
use crate::directory::ReadOnlySource;
use crate::postings::compression::{
AlignedBuffer, BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE
AlignedBuffer, BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE,
};
use crate::postings::{FreqReadingOption, SkipReader, BlockInfo};
use crate::postings::{BlockInfo, FreqReadingOption, SkipReader};
use crate::schema::IndexRecordOption;
use crate::{DocId, TERMINATED};

Expand Down Expand Up @@ -90,7 +90,7 @@ impl BlockSegmentPostings {

let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, data);
let skip_reader = match skip_data_opt {
Some(skip_data) => SkipReader::new(skip_data, doc_freq, record_option),
Some(skip_data) => SkipReader::new(skip_data, doc_freq, record_option),
None => SkipReader::new(ReadOnlySource::empty(), doc_freq, record_option),
};

Expand Down Expand Up @@ -123,7 +123,7 @@ impl BlockSegmentPostings {
if let Some(skip_data) = skip_data_opt {
self.skip_reader.reset(skip_data, doc_freq);
} else {
self.skip_reader.reset(ReadOnlySource::empty(), doc_freq);
self.skip_reader.reset(ReadOnlySource::empty(), doc_freq);
}
self.doc_freq = doc_freq as usize;
}
Expand Down Expand Up @@ -214,13 +214,12 @@ impl BlockSegmentPostings {

self.doc_decoder.clear();

let num_vint_docs =
if let BlockInfo::VInt(num_vint_docs) = self.skip_reader.block_info() {
num_vint_docs
} else {
// TODO
unimplemented!()
};
let num_vint_docs = if let BlockInfo::VInt(num_vint_docs) = self.skip_reader.block_info() {
num_vint_docs
} else {
// TODO
unimplemented!()
};
if num_vint_docs == 0 {
return BlockSegmentPostingsSkipResult::Terminated;
}
Expand All @@ -239,26 +238,14 @@ impl BlockSegmentPostings {
.unwrap_or(BlockSegmentPostingsSkipResult::Terminated)
}

fn read_bitpacked_block(&mut self, doc_num_bits: u8, tf_num_bits: u8) {
let offset = self.skip_reader.byte_offset();
decode_block(
&mut self.doc_decoder,
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
Some(&mut self.freq_decoder)
} else {
None
},
&self.data.as_slice()[offset..],
self.skip_reader.last_doc_in_previous_block,
doc_num_bits,
tf_num_bits,
);
}

pub fn read_block(&mut self) {
let offset = self.skip_reader.byte_offset();
match self.skip_reader.block_info() {
BlockInfo::BitPacked { doc_num_bits, tf_num_bits, .. } => {
BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
..
} => {
decode_block(
&mut self.doc_decoder,
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
Expand Down Expand Up @@ -298,9 +285,9 @@ impl BlockSegmentPostings {
}

if let BlockInfo::VInt(num_vint_docs) = self.skip_reader.block_info() {
if num_vint_docs== 0 {
return false;
}
if num_vint_docs == 0 {
return false;
}
}
self.read_block();
true
Expand Down
2 changes: 1 addition & 1 deletion src/postings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};

pub use self::postings::Postings;
pub(crate) use self::skip::{SkipReader, BlockInfo};
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub use self::term_info::TermInfo;

pub use self::block_segment_postings::{BlockSegmentPostings, BlockSegmentPostingsSkipResult};
Expand Down
Loading

0 comments on commit d207de5

Please sign in to comment.