diff --git a/rust/parquet/src/arrow/arrow_array_reader.rs b/rust/parquet/src/arrow/arrow_array_reader.rs index bbe5224e8b7a4..369d40f4686e1 100644 --- a/rust/parquet/src/arrow/arrow_array_reader.rs +++ b/rust/parquet/src/arrow/arrow_array_reader.rs @@ -1,6 +1,6 @@ use std::{any::Any, collections::VecDeque}; use std::{rc::Rc, cell::RefCell}; -use arrow::{array::{ArrayRef, Int16Array}, buffer::{MutableBuffer, Buffer}, datatypes::{DataType as ArrowType}}; +use arrow::{array::{ArrayRef, Int16Array}, buffer::MutableBuffer, datatypes::{DataType as ArrowType}}; use crate::{column::page::{Page, PageIterator}, memory::{ByteBufferPtr, BufferPtr}, schema::types::{ColumnDescPtr, ColumnDescriptor}}; use crate::arrow::schema::parquet_to_arrow_field; use crate::errors::{ParquetError, Result}; @@ -245,21 +245,23 @@ impl Splittable for Result> { } } -impl Splittable for Result<(usize, ByteBufferPtr)> { - type BufferType = Result<(usize, ByteBufferPtr)>; - type OutputType = Result<(usize, ByteBufferPtr)>; +use crate::encodings::decoding::ValueByteChunk; + +impl Splittable for Result { + type BufferType = Result; + type OutputType = Result; #[inline] fn len(&self) -> usize { match self { - Ok(x) => x.0, + Ok(x) => x.value_count, _ => 0 } } #[inline] fn split(self, len: usize) -> (Self::BufferType, Self::BufferType) { - let (value_count, mut byte_buffer) = if let Ok(item) = self { + let mut value_byte_chunk = if let Ok(item) = self { item } else { @@ -267,23 +269,41 @@ impl Splittable for Result<(usize, ByteBufferPtr)> { // so it should always be fully consumed and never split return (self.clone(), self); }; - let value_size = byte_buffer.len() / value_count; - (Ok((len, byte_buffer.split_to( len * value_size))), Ok((value_count - len, byte_buffer))) + let value_bit_len = value_byte_chunk.value_bit_len; + let bit_len = len * value_bit_len; + // TODO: use bit_offset when splitting bit / bool values + assert!(bit_len % 8 == 0, "value byte buffer can only be split into whole bytes"); + let byte_len = bit_len / 8; + + let split_value_chunk = ValueByteChunk::new( + value_byte_chunk.data.split_to(byte_len), + len, + value_bit_len, + ); + value_byte_chunk.value_count -= len; + (Ok(split_value_chunk), Ok(value_byte_chunk)) } } -struct ArrowArrayReader<'a> { +type LevelBufferPtr = BufferPtr; + +trait ArrayConverter { + fn convert_value_chunks(&self, value_byte_chunks: Vec) -> Result; +} + +struct ArrowArrayReader<'a, C: ArrayConverter + 'a> { column_desc: ColumnDescPtr, data_type: ArrowType, - def_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result>>, - rep_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result>>, - value_iter_factory: SplittableBatchingIteratorFactory<'a, Result<(usize, ByteBufferPtr)>>, + def_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result>, + rep_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result>, + value_iter_factory: SplittableBatchingIteratorFactory<'a, Result>, last_def_levels: Option, last_rep_levels: Option, + array_converter: C, } -impl<'a> ArrowArrayReader<'a> { - fn try_new(column_chunk_iterator: P, column_desc: ColumnDescPtr, arrow_type: Option) -> Result { +impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { + fn try_new(column_chunk_iterator: P, column_desc: ColumnDescPtr, array_converter: C, arrow_type: Option) -> Result { let data_type = match arrow_type { Some(t) => t, None => parquet_to_arrow_field(column_desc.as_ref())? @@ -332,6 +352,7 @@ impl<'a> ArrowArrayReader<'a> { value_iter_factory: SplittableBatchingIteratorFactory::new(value_iter), last_def_levels: None, last_rep_levels: None, + array_converter, }) } @@ -345,14 +366,14 @@ impl<'a> ArrowArrayReader<'a> { column_desc.max_rep_level() > 0 } - fn map_page_error(err: ParquetError) -> (Box>>, Box>>>, Box>>>) + fn map_page_error(err: ParquetError) -> (Box>>, Box>>, Box>>) { (Box::new(std::iter::once(Err(err.clone()))), Box::new(std::iter::once(Err(err.clone()))), Box::new(std::iter::once(Err(err)))) } // Split Result into Result<(Iterator, Iterator, Iterator)> // this method could fail, e.g. if the page encoding is not supported - fn map_page(page: Page, column_desc: &ColumnDescriptor) -> Result<(Box>>, Box>>>, Box>>>)> + fn map_page(page: Page, column_desc: &ColumnDescriptor) -> Result<(Box>>, Box>>, Box>>)> { use crate::encodings::levels::LevelDecoder; // process page (V1, V2, Dictionary) @@ -370,7 +391,7 @@ impl<'a> ArrowArrayReader<'a> { } => { let mut offset = 0; // create rep level decoder iterator - let rep_level_iter: Box>>> = if Self::rep_levels_available(&column_desc) { + let rep_level_iter: Box>> = if Self::rep_levels_available(&column_desc) { let rep_levels_byte_len = rep_levels_byte_len as usize; let mut rep_decoder = LevelDecoder::v2(column_desc.max_rep_level()); @@ -387,10 +408,10 @@ impl<'a> ArrowArrayReader<'a> { Box::new(std::iter::once(Err(ParquetError::General(format!("rep levels are not available"))))) }; // create def level decoder iterator - let def_level_iter: Box>>> = if Self::def_levels_available(&column_desc) { + let def_level_iter: Box>> = if Self::def_levels_available(&column_desc) { let def_levels_byte_len = def_levels_byte_len as usize; let mut def_decoder = - LevelDecoder::v2(column_desc.max_rep_level()); + LevelDecoder::v2(column_desc.max_def_level()); def_decoder.set_data_range( num_values as usize, &buf, @@ -405,10 +426,12 @@ impl<'a> ArrowArrayReader<'a> { }; // create value decoder iterator - let value_iter = vec![Ok((0, ByteBufferPtr::new(Vec::::new())))]; - + let values_buffer = buf.start_from(offset); + let value_iter = Self::get_values_decoder_iter( + values_buffer, num_values as usize, encoding, column_desc + )?; Ok(( - Box::new(value_iter.into_iter()), + value_iter, def_level_iter, rep_level_iter )) @@ -419,35 +442,48 @@ impl<'a> ArrowArrayReader<'a> { } } - fn convert_value_buffers(value_buffers: Vec<(usize, ByteBufferPtr)>) -> Result { - use arrow::datatypes::ArrowNativeType; - let data_len = value_buffers.len(); - let offset_size = std::mem::size_of::(); - let mut offsets = MutableBuffer::new((data_len + 1) * offset_size); - let values_byte_len = value_buffers.iter().map(|(_len, bytes)| bytes.len()).sum(); - let mut values = MutableBuffer::new(values_byte_len); - - let mut length_so_far = i32::default(); - offsets.push(length_so_far); + fn get_values_decoder_iter(values_buffer: ByteBufferPtr, num_values: usize, mut encoding: crate::basic::Encoding, column_desc: &ColumnDescriptor) -> Result>>> { + use crate::basic::Encoding; + if encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY; + } - for (value_len, value_bytes) in value_buffers { - debug_assert_eq!( - value_len, 1, - "offset length value buffers can only contain bytes for a single value" - ); - length_so_far += ::from_usize(value_bytes.len()).unwrap(); - offsets.push(length_so_far); - values.extend_from_slice(value_bytes.data()); + match encoding { + Encoding::PLAIN => Self::get_plain_decoder_iterator(values_buffer, num_values, column_desc), + // Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + // return Err(general_err!( + // "Cannot initialize this encoding through this function" + // )); + // } + // Encoding::RLE => Box::new(RleValueDecoder::new()), + // Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()), + // Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()), + // Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()), + e => return Err(nyi_err!("Encoding {} is not supported", e)), } - let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8) - .len(data_len) - .add_buffer(offsets.into()) - .add_buffer(values.into()) - .build(); - Ok(array_data) } - fn build_level_array(level_buffers: Vec>) -> Int16Array { + fn get_plain_decoder_iterator(values_buffer: ByteBufferPtr, num_values: usize, column_desc: &ColumnDescriptor) -> Result>>> { + use crate::encodings::decoding::{FixedLenPlainDecoder, VariableLenPlainDecoder}; + use crate::basic::Type as PhysicalType; + + // parquet only supports a limited number of physical types + // later converters cast to a more specific arrow / logical type if necessary + let value_bit_len: usize = match column_desc.physical_type() { + PhysicalType::BOOLEAN => 1, + PhysicalType::INT32 | PhysicalType::FLOAT => 32, + PhysicalType::INT64 | PhysicalType::DOUBLE => 64, + PhysicalType::INT96 => 96, + PhysicalType::BYTE_ARRAY => { + return Ok(Box::new(VariableLenPlainDecoder::new(values_buffer, num_values))); + }, + PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8, + }; + + Ok(Box::new(FixedLenPlainDecoder::new(values_buffer, num_values, value_bit_len))) + } + + fn build_level_array(level_buffers: Vec) -> Int16Array { let value_count = level_buffers.iter().map(|levels| levels.len()).sum(); let values_byte_len = value_count * std::mem::size_of::(); let mut value_buffer = MutableBuffer::new(values_byte_len); @@ -462,7 +498,7 @@ impl<'a> ArrowArrayReader<'a> { } } -impl ArrayReader for ArrowArrayReader<'static> { +impl ArrayReader for ArrowArrayReader<'static, C> { fn as_any(&self) -> &dyn Any { self } @@ -475,7 +511,7 @@ impl ArrayReader for ArrowArrayReader<'static> { if Self::rep_levels_available(&self.column_desc) { // read rep levels if available let rep_level_iter = self.rep_level_iter_factory.get_batch_iter(batch_size); - let rep_level_buffers: Vec> = rep_level_iter.collect::>()?; + let rep_level_buffers: Vec = rep_level_iter.collect::>()?; let rep_level_array = Self::build_level_array(rep_level_buffers); self.last_rep_levels = Some(rep_level_array); } @@ -489,7 +525,7 @@ impl ArrayReader for ArrowArrayReader<'static> { // if def levels are available - they determine how many values will be read let def_level_iter = self.def_level_iter_factory.get_batch_iter(batch_size); // decode def levels, return first error if any - let def_level_buffers: Vec> = def_level_iter.collect::>()?; + let def_level_buffers: Vec = def_level_iter.collect::>()?; let def_level_array = Self::build_level_array(def_level_buffers); let def_level_count = def_level_array.len(); // use eq_scalar to efficiently build null bitmap array from def levels @@ -505,9 +541,9 @@ impl ArrayReader for ArrowArrayReader<'static> { // collect and unwrap values into Vec, return first error if any // this will separate reading and decoding values from creating an arrow array // extra memory is allocated for the Vec, but the values still point to the page buffer - let values: Vec<(usize, ByteBufferPtr)> = value_iter.collect::>()?; + let values: Vec = value_iter.collect::>()?; // converter only creates a no-null / all value array data - let mut value_array_data = Self::convert_value_buffers(values)?; + let mut value_array_data = self.array_converter.convert_value_chunks(values)?; if let Some(null_bitmap_array) = null_bitmap_array { // Only if def levels are available - insert null values efficiently using MutableArrayData. @@ -552,6 +588,45 @@ impl ArrayReader for ArrowArrayReader<'static> { } } +struct StringArrayConverter {} + +impl StringArrayConverter { + fn new() -> Self { + Self {} + } +} + +impl ArrayConverter for StringArrayConverter { + fn convert_value_chunks(&self, value_byte_chunks: Vec) -> Result { + use arrow::datatypes::ArrowNativeType; + let data_len = value_byte_chunks.len(); + let offset_size = std::mem::size_of::(); + let mut offsets = MutableBuffer::new((data_len + 1) * offset_size); + let values_byte_len = value_byte_chunks.iter().map(|x| x.data.len()).sum(); + let mut values = MutableBuffer::new(values_byte_len); + + let mut length_so_far = i32::default(); + offsets.push(length_so_far); + + for value_chunk in value_byte_chunks { + debug_assert_eq!( + value_chunk.value_count, 1, + "offset length value buffers can only contain bytes for a single value" + ); + let value_bytes = value_chunk.data; + length_so_far += ::from_usize(value_bytes.len()).unwrap(); + offsets.push(length_so_far); + values.extend_from_slice(value_bytes.data()); + } + let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8) + .len(data_len) + .add_buffer(offsets.into()) + .add_buffer(values.into()) + .build(); + Ok(array_data) + } +} + #[cfg(test)] mod tests { use super::*; @@ -631,18 +706,10 @@ mod tests { } let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); - - // let converter = Utf8Converter::new(Utf8ArrayConverter {}); - // let mut array_reader = - // ComplexObjectArrayReader::::new( - // Box::new(page_iterator), - // column_desc, - // converter, - // None, - // ) - // .unwrap(); - - let mut array_reader = ArrowArrayReader::try_new(page_iterator, column_desc, None).unwrap(); + let converter = StringArrayConverter::new(); + let mut array_reader = ArrowArrayReader::try_new( + page_iterator, column_desc, converter, None + ).unwrap(); let mut accu_len: usize = 0; diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index b73ebf0285c6a..01f2e2d17542d 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -146,6 +146,115 @@ pub struct PlainDecoderDetails { pub(crate) bit_reader: Option, } +#[derive(Clone, Debug)] +pub(crate) struct ValueByteChunk { + pub data: ByteBufferPtr, + pub value_count: usize, + pub value_bit_len: usize, + pub bit_offset: u8, +} + +impl ValueByteChunk { + pub fn new(data: ByteBufferPtr, value_count: usize, value_bit_len: usize) -> Self { + Self { + data, + value_count, + value_bit_len, + bit_offset: 0, + } + } +} + +pub(crate) struct FixedLenPlainDecoder { + data: ByteBufferPtr, + num_values: usize, + value_bit_len: usize, +} + +impl FixedLenPlainDecoder { + pub(crate) fn new(data: ByteBufferPtr, num_values: usize, value_bit_len: usize) -> Self { + Self { + data, + num_values, + value_bit_len, + } + } +} + +impl Iterator for FixedLenPlainDecoder { + type Item = Result; + + fn next(&mut self) -> Option { + use crate::util::bit_util::ceil; + if self.num_values > 0 { + // calculate number of whole bytes to read + let byte_len = ceil((self.num_values * self.value_bit_len) as i64, 8) as usize; + if byte_len > self.data.len() { + return Some(Err(eof_err!("Not enough bytes to decode"))); + } + self.num_values = 0; + // a single value chunk is returned, containing all values + let value_byte_chunk = ValueByteChunk::new( + self.data.range(0, byte_len), + self.num_values, + self.value_bit_len, + ); + Some(Ok(value_byte_chunk)) + } + else { + None + } + } +} + +pub(crate) struct VariableLenPlainDecoder { + data: ByteBufferPtr, + num_values: usize, + position: usize, +} + +impl VariableLenPlainDecoder { + pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self { + Self { + data, + num_values, + position: 0, + } + } +} + +impl Iterator for VariableLenPlainDecoder { + type Item = Result; + + fn next(&mut self) -> Option { + const LEN_SIZE: usize = std::mem::size_of::(); + let data_len = self.data.len(); + if self.position < data_len && self.num_values > 0 { + let len: usize = + read_num_bytes!(u32, LEN_SIZE, self.data.data()[self.position..]) + as usize; + self.position += LEN_SIZE; + + if data_len < self.position + len { + return Some(Err(eof_err!("Not enough bytes to decode"))); + } + let value_bytes = self.data.range(self.position, len); + self.position += len; + self.num_values -= 1; + // variable length value chunks contain a single value each + let value_byte_chunk = ValueByteChunk::new( + value_bytes, + 1, + 0, + ); + Some(Ok(value_byte_chunk)) + } + else { + None + } + } +} + /// Plain decoding that supports all types. /// Values are encoded back to back. For native types, data is encoded as little endian. /// Floating point types are encoded in IEEE. diff --git a/rust/parquet/src/util/memory.rs b/rust/parquet/src/util/memory.rs index c4e10e144e49d..877f5e3e138fd 100644 --- a/rust/parquet/src/util/memory.rs +++ b/rust/parquet/src/util/memory.rs @@ -292,6 +292,7 @@ impl BufferPtr { } /// Returns slice of data in this buffer. + #[inline] pub fn data(&self) -> &[T] { &self.data[self.start..self.start + self.len] } @@ -319,6 +320,7 @@ impl BufferPtr { } /// Returns length of this buffer + #[inline] pub fn len(&self) -> usize { self.len }