Skip to content

Commit

Permalink
Support Predicate Pushdown for Parquet Lists (#2108) (#2999)
Browse files Browse the repository at this point in the history
* Add buffer to ColumnLevelDecoderImpl (#2108)

* Implement skip_rep_levels

* Add integration test

* Clippy
  • Loading branch information
tustvold authored Nov 5, 2022
1 parent fc58036 commit e2c4199
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 32 deletions.
28 changes: 28 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2421,4 +2421,32 @@ mod tests {
let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
}

#[test]
#[cfg(feature = "snap")]
fn test_read_nested_lists() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/nested_lists.snappy.parquet", testdata);
let file = File::open(&path).unwrap();

let f = file.try_clone().unwrap();
let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
let expected = reader.next().unwrap().unwrap();
assert_eq!(expected.num_rows(), 3);

let selection = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(1),
]);
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.with_row_selection(selection)
.build()
.unwrap();

let actual = reader.next().unwrap().unwrap();
assert_eq!(actual.num_rows(), 1);
assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
}
}
221 changes: 189 additions & 32 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,13 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
}
}

const SKIP_BUFFER_SIZE: usize = 1024;

/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
pub struct ColumnLevelDecoderImpl {
decoder: Option<LevelDecoderInner>,
/// Temporary buffer populated when skipping values
buffer: Vec<i16>,
bit_width: u8,
}

Expand All @@ -275,9 +279,36 @@ impl ColumnLevelDecoderImpl {
let bit_width = num_required_bits(max_level as u64);
Self {
decoder: None,
buffer: vec![],
bit_width,
}
}

/// Drops the first `len` values from the internal buffer
fn split_off_buffer(&mut self, len: usize) {
match self.buffer.len() == len {
true => self.buffer.clear(),
false => {
// Move to_read elements to end of slice
self.buffer.rotate_left(len);
// Truncate buffer
self.buffer.truncate(self.buffer.len() - len);
}
}
}

/// Reads up to `to_read` values to the internal buffer
fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
let mut buf = std::mem::take(&mut self.buffer);

// Repopulate buffer
buf.resize(to_read, 0);
let actual = self.read(&mut buf, 0..to_read)?;
buf.truncate(actual);

self.buffer = buf;
Ok(())
}
}

enum LevelDecoderInner {
Expand All @@ -289,6 +320,7 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
type Slice = [i16];

fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
self.buffer.clear();
match encoding {
Encoding::RLE => {
let mut decoder = RleDecoder::new(self.bit_width);
Expand All @@ -305,12 +337,25 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
}
}

fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
let read_from_buffer = match self.buffer.is_empty() {
true => 0,
false => {
let read_from_buffer = self.buffer.len().min(range.end - range.start);
out[range.start..range.start + read_from_buffer]
.copy_from_slice(&self.buffer[0..read_from_buffer]);
self.split_off_buffer(read_from_buffer);
read_from_buffer
}
};
range.start += read_from_buffer;

match self.decoder.as_mut().unwrap() {
LevelDecoderInner::Packed(reader, bit_width) => {
Ok(reader.get_batch::<i16>(&mut out[range], *bit_width as usize))
LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
+ reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
LevelDecoderInner::Rle(reader) => {
Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
}
LevelDecoderInner::Rle(reader) => reader.get_batch(&mut out[range]),
}
}
}
Expand All @@ -323,41 +368,153 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
) -> Result<(usize, usize)> {
let mut level_skip = 0;
let mut value_skip = 0;
match self.decoder.as_mut().unwrap() {
LevelDecoderInner::Packed(reader, bit_width) => {
for _ in 0..num_levels {
// Values are delimited by max_def_level
if max_def_level
== reader
.get_value::<i16>(*bit_width as usize)
.expect("Not enough values in Packed ColumnLevelDecoderImpl.")
{
value_skip += 1;
}
level_skip += 1;
}
}
LevelDecoderInner::Rle(reader) => {
for _ in 0..num_levels {
if let Some(level) = reader
.get::<i16>()
.expect("Not enough values in Rle ColumnLevelDecoderImpl.")
{
// Values are delimited by max_def_level
if level == max_def_level {
value_skip += 1;
}
}
level_skip += 1;
while level_skip < num_levels {
let remaining_levels = num_levels - level_skip;

if self.buffer.is_empty() {
// Only read number of needed values
self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
if self.buffer.is_empty() {
// Reached end of page
break;
}
}
let to_read = self.buffer.len().min(remaining_levels);

level_skip += to_read;
value_skip += self.buffer[..to_read]
.iter()
.filter(|x| **x == max_def_level)
.count();

self.split_off_buffer(to_read)
}

Ok((value_skip, level_skip))
}
}

impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
let mut level_skip = 0;
let mut record_skip = 0;

loop {
if self.buffer.is_empty() {
// Read SKIP_BUFFER_SIZE as we don't know how many to read
self.read_to_buffer(SKIP_BUFFER_SIZE)?;
if self.buffer.is_empty() {
// Reached end of page
break;
}
}

let mut to_skip = 0;
while to_skip < self.buffer.len() && record_skip != num_records {
if self.buffer[to_skip] == 0 {
record_skip += 1;
}
to_skip += 1;
}

// Find end of record
while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 {
to_skip += 1;
}

level_skip += to_skip;
if to_skip >= self.buffer.len() {
// Need to to read more values
self.buffer.clear();
continue;
}

self.split_off_buffer(to_skip);
break;
}

Ok((record_skip, level_skip))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::encodings::rle::RleEncoder;
use rand::prelude::*;

fn test_skip_levels<F>(encoded: &[i16], data: ByteBufferPtr, skip: F)
where
F: Fn(&mut ColumnLevelDecoderImpl, &mut usize, usize),
{
let mut rng = thread_rng();
let mut decoder = ColumnLevelDecoderImpl::new(5);
decoder.set_data(Encoding::RLE, data);

let mut read = 0;
let mut decoded = vec![];
let mut expected = vec![];
while read < encoded.len() {
let to_read = rng.gen_range(0..(encoded.len() - read).min(100)) + 1;

if rng.gen_bool(0.5) {
skip(&mut decoder, &mut read, to_read)
} else {
let start = decoded.len();
let end = decoded.len() + to_read;
decoded.resize(end, 0);
let actual_read = decoder.read(&mut decoded, start..end).unwrap();
assert_eq!(actual_read, to_read);
expected.extend_from_slice(&encoded[read..read + to_read]);
read += to_read;
}
}
assert_eq!(decoded, expected);
}

#[test]
fn test_skip() {
let mut rng = thread_rng();
let total_len = 10000;
let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
let mut encoder = RleEncoder::new(3, 1024);
for v in &encoded {
encoder.put(*v as _)
}
let data = ByteBufferPtr::new(encoder.consume());

for _ in 0..10 {
test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
let (values_skipped, levels_skipped) =
decoder.skip_def_levels(to_read, 5).unwrap();
assert_eq!(levels_skipped, to_read);

let expected = &encoded[*read..*read + to_read];
let expected_values_skipped =
expected.iter().filter(|x| **x == 5).count();
assert_eq!(values_skipped, expected_values_skipped);
*read += to_read;
});

test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
let (records_skipped, levels_skipped) =
decoder.skip_rep_levels(to_read).unwrap();

// If not run out of values
if levels_skipped + *read != encoded.len() {
// Should have read correct number of records
assert_eq!(records_skipped, to_read);
// Next value should be start of record
assert_eq!(encoded[levels_skipped + *read], 0);
}

let expected = &encoded[*read..*read + levels_skipped];
let expected_records_skipped =
expected.iter().filter(|x| **x == 0).count();
assert_eq!(records_skipped, expected_records_skipped);

*read += levels_skipped;
});
}
}
}
1 change: 1 addition & 0 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl RleDecoder {
// These functions inline badly, they tend to inline and then create very large loop unrolls
// that damage L1d-cache occupancy. This results in a ~18% performance drop
#[inline(never)]
#[allow(unused)]
pub fn get<T: FromBytes>(&mut self) -> Result<Option<T>> {
assert!(size_of::<T>() <= 8);

Expand Down

0 comments on commit e2c4199

Please sign in to comment.