diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 7f68b07eb487..eea271306e25 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -2421,4 +2421,32 @@ mod tests { let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap(); assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]); } + + #[test] + #[cfg(feature = "snap")] + fn test_read_nested_lists() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/nested_lists.snappy.parquet", testdata); + let file = File::open(&path).unwrap(); + + let f = file.try_clone().unwrap(); + let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap(); + let expected = reader.next().unwrap().unwrap(); + assert_eq!(expected.num_rows(), 3); + + let selection = RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(1), + ]); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(file) + .unwrap() + .with_row_selection(selection) + .build() + .unwrap(); + + let actual = reader.next().unwrap().unwrap(); + assert_eq!(actual.num_rows(), 1); + assert_eq!(actual.column(0), &expected.column(0).slice(1, 1)); + } } diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index b95b24a21c4b..da7fa78fe485 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -264,9 +264,13 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { } } +const SKIP_BUFFER_SIZE: usize = 1024; + /// An implementation of [`ColumnLevelDecoder`] for `[i16]` pub struct ColumnLevelDecoderImpl { decoder: Option, + /// Temporary buffer populated when skipping values + buffer: Vec, bit_width: u8, } @@ -275,9 +279,36 @@ impl ColumnLevelDecoderImpl { let bit_width = num_required_bits(max_level as u64); Self { decoder: None, + buffer: vec![], bit_width, } } + + /// Drops the first `len` values from the internal buffer + fn split_off_buffer(&mut self, len: usize) { + match self.buffer.len() == len { + true => self.buffer.clear(), + false => { + // Move to_read elements to end of slice + self.buffer.rotate_left(len); + // Truncate buffer + self.buffer.truncate(self.buffer.len() - len); + } + } + } + + /// Reads up to `to_read` values to the internal buffer + fn read_to_buffer(&mut self, to_read: usize) -> Result<()> { + let mut buf = std::mem::take(&mut self.buffer); + + // Repopulate buffer + buf.resize(to_read, 0); + let actual = self.read(&mut buf, 0..to_read)?; + buf.truncate(actual); + + self.buffer = buf; + Ok(()) + } } enum LevelDecoderInner { @@ -289,6 +320,7 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl { type Slice = [i16]; fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) { + self.buffer.clear(); match encoding { Encoding::RLE => { let mut decoder = RleDecoder::new(self.bit_width); @@ -305,12 +337,25 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl { } } - fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + fn read(&mut self, out: &mut Self::Slice, mut range: Range) -> Result { + let read_from_buffer = match self.buffer.is_empty() { + true => 0, + false => { + let read_from_buffer = self.buffer.len().min(range.end - range.start); + out[range.start..range.start + read_from_buffer] + .copy_from_slice(&self.buffer[0..read_from_buffer]); + self.split_off_buffer(read_from_buffer); + read_from_buffer + } + }; + range.start += read_from_buffer; + match self.decoder.as_mut().unwrap() { - LevelDecoderInner::Packed(reader, bit_width) => { - Ok(reader.get_batch::(&mut out[range], *bit_width as usize)) + LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer + + reader.get_batch::(&mut out[range], *bit_width as usize)), + LevelDecoderInner::Rle(reader) => { + Ok(read_from_buffer + reader.get_batch(&mut out[range])?) } - LevelDecoderInner::Rle(reader) => reader.get_batch(&mut out[range]), } } } @@ -323,41 +368,153 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl { ) -> Result<(usize, usize)> { let mut level_skip = 0; let mut value_skip = 0; - match self.decoder.as_mut().unwrap() { - LevelDecoderInner::Packed(reader, bit_width) => { - for _ in 0..num_levels { - // Values are delimited by max_def_level - if max_def_level - == reader - .get_value::(*bit_width as usize) - .expect("Not enough values in Packed ColumnLevelDecoderImpl.") - { - value_skip += 1; - } - level_skip += 1; - } - } - LevelDecoderInner::Rle(reader) => { - for _ in 0..num_levels { - if let Some(level) = reader - .get::() - .expect("Not enough values in Rle ColumnLevelDecoderImpl.") - { - // Values are delimited by max_def_level - if level == max_def_level { - value_skip += 1; - } - } - level_skip += 1; + while level_skip < num_levels { + let remaining_levels = num_levels - level_skip; + + if self.buffer.is_empty() { + // Only read number of needed values + self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?; + if self.buffer.is_empty() { + // Reached end of page + break; } } + let to_read = self.buffer.len().min(remaining_levels); + + level_skip += to_read; + value_skip += self.buffer[..to_read] + .iter() + .filter(|x| **x == max_def_level) + .count(); + + self.split_off_buffer(to_read) } + Ok((value_skip, level_skip)) } } impl RepetitionLevelDecoder for ColumnLevelDecoderImpl { - fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> { + let mut level_skip = 0; + let mut record_skip = 0; + + loop { + if self.buffer.is_empty() { + // Read SKIP_BUFFER_SIZE as we don't know how many to read + self.read_to_buffer(SKIP_BUFFER_SIZE)?; + if self.buffer.is_empty() { + // Reached end of page + break; + } + } + + let mut to_skip = 0; + while to_skip < self.buffer.len() && record_skip != num_records { + if self.buffer[to_skip] == 0 { + record_skip += 1; + } + to_skip += 1; + } + + // Find end of record + while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 { + to_skip += 1; + } + + level_skip += to_skip; + if to_skip >= self.buffer.len() { + // Need to to read more values + self.buffer.clear(); + continue; + } + + self.split_off_buffer(to_skip); + break; + } + + Ok((record_skip, level_skip)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::encodings::rle::RleEncoder; + use rand::prelude::*; + + fn test_skip_levels(encoded: &[i16], data: ByteBufferPtr, skip: F) + where + F: Fn(&mut ColumnLevelDecoderImpl, &mut usize, usize), + { + let mut rng = thread_rng(); + let mut decoder = ColumnLevelDecoderImpl::new(5); + decoder.set_data(Encoding::RLE, data); + + let mut read = 0; + let mut decoded = vec![]; + let mut expected = vec![]; + while read < encoded.len() { + let to_read = rng.gen_range(0..(encoded.len() - read).min(100)) + 1; + + if rng.gen_bool(0.5) { + skip(&mut decoder, &mut read, to_read) + } else { + let start = decoded.len(); + let end = decoded.len() + to_read; + decoded.resize(end, 0); + let actual_read = decoder.read(&mut decoded, start..end).unwrap(); + assert_eq!(actual_read, to_read); + expected.extend_from_slice(&encoded[read..read + to_read]); + read += to_read; + } + } + assert_eq!(decoded, expected); + } + + #[test] + fn test_skip() { + let mut rng = thread_rng(); + let total_len = 10000; + let encoded: Vec = (0..total_len).map(|_| rng.gen_range(0..5)).collect(); + let mut encoder = RleEncoder::new(3, 1024); + for v in &encoded { + encoder.put(*v as _) + } + let data = ByteBufferPtr::new(encoder.consume()); + + for _ in 0..10 { + test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| { + let (values_skipped, levels_skipped) = + decoder.skip_def_levels(to_read, 5).unwrap(); + assert_eq!(levels_skipped, to_read); + + let expected = &encoded[*read..*read + to_read]; + let expected_values_skipped = + expected.iter().filter(|x| **x == 5).count(); + assert_eq!(values_skipped, expected_values_skipped); + *read += to_read; + }); + + test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| { + let (records_skipped, levels_skipped) = + decoder.skip_rep_levels(to_read).unwrap(); + + // If not run out of values + if levels_skipped + *read != encoded.len() { + // Should have read correct number of records + assert_eq!(records_skipped, to_read); + // Next value should be start of record + assert_eq!(encoded[levels_skipped + *read], 0); + } + + let expected = &encoded[*read..*read + levels_skipped]; + let expected_records_skipped = + expected.iter().filter(|x| **x == 0).count(); + assert_eq!(records_skipped, expected_records_skipped); + + *read += levels_skipped; + }); + } } } diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 9475275cb625..b0ae5af07d7f 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -338,6 +338,7 @@ impl RleDecoder { // These functions inline badly, they tend to inline and then create very large loop unrolls // that damage L1d-cache occupancy. This results in a ~18% performance drop #[inline(never)] + #[allow(unused)] pub fn get(&mut self) -> Result> { assert!(size_of::() <= 8);