diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 2d4e57414ee6..7f0a0dd2a57c 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -17,6 +17,7 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::view_buffer::ViewBuffer; +use crate::arrow::decoder::DictIndexDecoder; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Encoding}; @@ -25,6 +26,7 @@ use crate::column::reader::decoder::ColumnValueDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow_array::ArrayRef; +use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; @@ -210,6 +212,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { /// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] pub enum ByteViewArrayDecoder { Plain(ByteViewArrayDecoderPlain), + Dictionary(ByteViewArrayDecoderDictionary), } impl ByteViewArrayDecoder { @@ -227,10 +230,14 @@ impl ByteViewArrayDecoder { num_values, validate_utf8, )), - Encoding::RLE_DICTIONARY - | Encoding::PLAIN_DICTIONARY - | Encoding::DELTA_LENGTH_BYTE_ARRAY - | Encoding::DELTA_BYTE_ARRAY => unimplemented!("stay tuned!"), + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( + data, num_levels, num_values, + )) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => { + unimplemented!("stay tuned!") + } _ => { return Err(general_err!( "unsupported encoding for byte array: {}", @@ -247,17 +254,27 @@ impl ByteViewArrayDecoder { &mut self, out: &mut ViewBuffer, len: usize, - _dict: Option<&ViewBuffer>, + dict: Option<&ViewBuffer>, ) -> Result { match self { ByteViewArrayDecoder::Plain(d) => d.read(out, len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = dict + .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?; + d.read(out, dict, len) + } } } /// Skip `len` values - pub fn skip(&mut self, len: usize, _dict: Option<&ViewBuffer>) -> Result { + pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result { match self { ByteViewArrayDecoder::Plain(d) => d.skip(len), + ByteViewArrayDecoder::Dictionary(d) => { + let dict = dict + .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?; + d.skip(dict, len) + } } } } @@ -348,6 +365,90 @@ impl ByteViewArrayDecoderPlain { } } +pub struct ByteViewArrayDecoderDictionary { + decoder: DictIndexDecoder, +} + +impl ByteViewArrayDecoderDictionary { + fn new(data: Bytes, num_levels: usize, num_values: Option) -> Self { + Self { + decoder: DictIndexDecoder::new(data, num_levels, num_values), + } + } + + /// Reads the next indexes from self.decoder + /// the indexes are assumed to be indexes into `dict` + /// the output values are written to output + /// + /// Assumptions / Optimization + /// This function checks if dict.buffers() are the last buffers in `output`, and if so + /// reuses the dictionary page buffers directly without copying data + fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result { + if dict.is_empty() || len == 0 { + return Ok(0); + } + + // Check if the last few buffer of `output`` are the same as the `dict` buffer + // This is to avoid creating a new buffers if the same dictionary is used for multiple `read` + let need_to_create_new_buffer = { + if output.buffers.len() >= dict.buffers.len() { + let offset = output.buffers.len() - dict.buffers.len(); + output.buffers[offset..] + .iter() + .zip(dict.buffers.iter()) + .any(|(a, b)| !a.ptr_eq(b)) + } else { + true + } + }; + + if need_to_create_new_buffer { + for b in dict.buffers.iter() { + output.buffers.push(b.clone()); + } + } + + // Calculate the offset of the dictionary buffers in the output buffers + // For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers, + // then the base_buffer_idx is 5 - 2 = 3 + let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32; + + self.decoder.read(len, |keys| { + for k in keys { + let view = dict + .views + .get(*k as usize) + .ok_or_else(|| general_err!("invalid key={} for dictionary", *k))?; + let len = *view as u32; + if len <= 12 { + // directly append the view if it is inlined + // Safety: the view is from the dictionary, so it is valid + unsafe { + output.append_raw_view_unchecked(view); + } + } else { + // correct the buffer index and append the view + let mut view = ByteView::from(*view); + view.buffer_index += base_buffer_idx; + // Safety: the view is from the dictionary, + // we corrected the index value to point it to output buffer, so it is valid + unsafe { + output.append_raw_view_unchecked(&view.into()); + } + } + } + Ok(()) + }) + } + + fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result { + if dict.is_empty() { + return Ok(0); + } + self.decoder.skip(to_skip) + } +} + /// Check that `val` is a valid UTF-8 sequence pub fn check_valid_utf8(val: &[u8]) -> Result<()> { match std::str::from_utf8(val) { @@ -386,8 +487,11 @@ mod tests { .unwrap(); for (encoding, page) in pages { - if encoding != Encoding::PLAIN { - // skip non-plain encodings for now as they are not yet implemented + if encoding != Encoding::PLAIN + && encoding != Encoding::RLE_DICTIONARY + && encoding != Encoding::PLAIN_DICTIONARY + { + // skip unsupported encodings for now as they are not yet implemented continue; } let mut output = ViewBuffer::default(); @@ -399,7 +503,6 @@ mod tests { assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); assert_eq!(output.views.len(), 4); - assert_eq!(output.buffers.len(), 4); let valid = [false, false, true, true, false, true, true, false, false]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 01e7c4aad36b..1651aa2d75c9 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -33,6 +33,10 @@ pub struct ViewBuffer { } impl ViewBuffer { + pub fn is_empty(&self) -> bool { + self.views.is_empty() + } + #[allow(unused)] pub fn append_block(&mut self, block: Buffer) -> u32 { let block_id = self.buffers.len() as u32; @@ -56,6 +60,15 @@ impl ViewBuffer { self.views.push(view); } + /// Directly append a view to the view array. + /// This is used when we create a StringViewArray from a dictionary whose values are StringViewArray. + /// + /// # Safety + /// The `view` must be a valid view as per the ByteView spec. + pub unsafe fn append_raw_view_unchecked(&mut self, view: &u128) { + self.views.push(*view); + } + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` #[allow(unused)] pub fn into_array(self, null_buffer: Option, data_type: &ArrowType) -> ArrayRef {