From 684c259a470ef66403fdd5403cd6be58a3e23828 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sat, 22 Jan 2022 16:08:33 +0100 Subject: [PATCH] DRY parquet module (#785) --- src/io/parquet/read/binary/basic.rs | 284 +++++++----------- src/io/parquet/read/binary/dictionary.rs | 110 ++----- src/io/parquet/read/binary/mod.rs | 24 +- src/io/parquet/read/binary/nested.rs | 28 +- src/io/parquet/read/binary/utils.rs | 69 ++++- src/io/parquet/read/boolean/basic.rs | 44 +-- src/io/parquet/read/fixed_size_binary.rs | 269 ----------------- src/io/parquet/read/fixed_size_binary/mod.rs | 213 +++++++++++++ .../parquet/read/fixed_size_binary/utils.rs | 52 ++++ src/io/parquet/read/primitive/basic.rs | 131 +++----- src/io/parquet/read/primitive/dictionary.rs | 73 +---- src/io/parquet/read/utils.rs | 131 +++++++- 12 files changed, 672 insertions(+), 756 deletions(-) delete mode 100644 src/io/parquet/read/fixed_size_binary.rs create mode 100644 src/io/parquet/read/fixed_size_binary/mod.rs create mode 100644 src/io/parquet/read/fixed_size_binary/utils.rs diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 37e04de07d6..32c22adb106 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -6,11 +6,35 @@ use parquet2::{ use crate::{ array::Offset, - bitmap::{utils::BitmapIter, MutableBitmap}, + bitmap::MutableBitmap, error::Result, + io::parquet::read::utils::{extend_from_decoder, Pushable}, }; -use super::super::utils; +use super::{super::utils, utils::Binary}; + +#[inline] +fn values_iter<'a>( + indices_buffer: &'a [u8], + dict: &'a BinaryPageDict, + additional: usize, +) -> impl Iterator + 'a { + let dict_values = dict.values(); + let dict_offsets = dict.offsets(); + + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); + indices.map(move |index| { + let index = index as usize; + let dict_offset_i = dict_offsets[index] as usize; + let dict_offset_ip1 = dict_offsets[index + 1] as usize; + &dict_values[dict_offset_i..dict_offset_ip1] + }) +} /// Assumptions: No rep levels #[allow(clippy::too_many_arguments)] @@ -19,61 +43,22 @@ fn read_dict_buffer( indices_buffer: &[u8], additional: usize, dict: &BinaryPageDict, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) { - let length = (offsets.len() - 1) + additional; - let dict_values = dict.values(); - let dict_offsets = dict.offsets(); - let mut last_offset = *offsets.as_mut_slice().last().unwrap(); + let length = values.len() + additional; - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; + let values_iterator = values_iter(indices_buffer, dict, additional); - let mut indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, length); + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - let remaining = length - (offsets.len() - 1); - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - if is_valid { - let index = indices.next().unwrap() as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - let length = dict_offset_ip1 - dict_offset_i; - last_offset += O::from_usize(length).unwrap(); - values.extend_from_slice(&dict_values[dict_offset_i..dict_offset_ip1]); - }; - offsets.push(last_offset); - } - validity.extend_from_slice(packed, 0, len); - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let index = indices.next().unwrap() as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - let length = dict_offset_ip1 - dict_offset_i; - last_offset += O::from_usize(length).unwrap(); - offsets.push(last_offset); - values.extend_from_slice(&dict_values[dict_offset_i..dict_offset_ip1]); - }) - } else { - offsets.resize(offsets.len() + additional, last_offset); - } - } - } - } + extend_from_decoder( + validity, + &mut validity_iterator, + length, + values, + values_iterator, + ); } #[allow(clippy::too_many_arguments)] @@ -81,80 +66,73 @@ fn read_dict_required( indices_buffer: &[u8], additional: usize, dict: &BinaryPageDict, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) { - let dict_values = dict.values(); - let dict_offsets = dict.offsets(); - let mut last_offset = *offsets.as_mut_slice().last().unwrap(); + let values_iterator = values_iter(indices_buffer, dict, additional); - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; + for value in values_iterator { + values.push(value); + } + validity.extend_constant(additional, true); +} - let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); +struct Offsets<'a, O: Offset>(pub &'a mut Vec); - for index in indices { - let index = index as usize; - let dict_offset_i = dict_offsets[index] as usize; - let dict_offset_ip1 = dict_offsets[index + 1] as usize; - let length = dict_offset_ip1 - dict_offset_i; - last_offset += O::from_usize(length).unwrap(); - offsets.push(last_offset); - values.extend_from_slice(&dict_values[dict_offset_i..dict_offset_ip1]); +impl<'a, O: Offset> Pushable for Offsets<'a, O> { + #[inline] + fn reserve(&mut self, additional: usize) { + self.0.reserve(additional) + } + + #[inline] + fn push(&mut self, value: O) { + self.0.push(value) + } + + #[inline] + fn push_null(&mut self) { + self.0.push(*self.0.last().unwrap()) + } + + #[inline] + fn extend_constant(&mut self, additional: usize, value: O) { + self.0.extend_constant(additional, value) } - validity.extend_constant(additional, true); } fn read_delta_optional( validity_buffer: &[u8], values_buffer: &[u8], additional: usize, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) { - let length = (offsets.len() - 1) + additional; - let mut last_offset = *offsets.as_mut_slice().last().unwrap(); + let length = values.len() + additional; + + let Binary { + offsets, + values, + last_offset, + } = values; // values_buffer: first 4 bytes are len, remaining is values let mut values_iterator = delta_length_byte_array::Decoder::new(values_buffer); + let offsets_iterator = values_iterator.by_ref().map(|x| { + *last_offset += O::from_usize(x as usize).unwrap(); + *last_offset + }); - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); // offsets: - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - // the pack may contain more items than needed. - let remaining = length - (offsets.len() - 1); - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - if is_valid { - let value = values_iterator.next().unwrap() as usize; - last_offset += O::from_usize(value).unwrap(); - } - offsets.push(last_offset); - } - validity.extend_from_slice(packed, 0, len); - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let value = values_iterator.next().unwrap() as usize; - last_offset += O::from_usize(value).unwrap(); - offsets.push(last_offset); - }) - } else { - offsets.resize(offsets.len() + additional, last_offset); - } - } - } - } + extend_from_decoder( + validity, + &mut validity_iterator, + length, + &mut Offsets::(offsets), + offsets_iterator, + ); // values: let new_values = values_iterator.into_values(); @@ -165,78 +143,46 @@ fn read_plain_optional( validity_buffer: &[u8], values_buffer: &[u8], additional: usize, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) { - let length = (offsets.len() - 1) + additional; - let mut last_offset = *offsets.as_mut_slice().last().unwrap(); + let length = values.len() + additional; // values_buffer: first 4 bytes are len, remaining is values - let mut values_iterator = utils::BinaryIter::new(values_buffer); + let values_iterator = utils::BinaryIter::new(values_buffer); - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - // the pack may contain more items than needed. - let remaining = length - (offsets.len() - 1); - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - if is_valid { - let value = values_iterator.next().unwrap(); - last_offset += O::from_usize(value.len()).unwrap(); - values.extend_from_slice(value); - } - offsets.push(last_offset); - } - validity.extend_from_slice(packed, 0, len); - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let value = values_iterator.next().unwrap(); - last_offset += O::from_usize(value.len()).unwrap(); - offsets.push(last_offset); - values.extend_from_slice(value) - }) - } else { - offsets.resize(offsets.len() + additional, last_offset); - } - } - } - } + extend_from_decoder( + validity, + &mut validity_iterator, + length, + values, + values_iterator, + ) } pub(super) fn read_plain_required( buffer: &[u8], additional: usize, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, ) { - let mut last_offset = *offsets.as_mut_slice().last().unwrap(); - let values_iterator = utils::BinaryIter::new(buffer); // each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly. - values.reserve(buffer.len() - 4 * additional); - let a = values.capacity(); + values.offsets.reserve(additional); + values.values.reserve(buffer.len() - 4 * additional); + let a = values.values.capacity(); for value in values_iterator { - last_offset += O::from_usize(value.len()).unwrap(); - values.extend_from_slice(value); - offsets.push(last_offset); + values.push(value); } - debug_assert_eq!(a, values.capacity()); + debug_assert_eq!(a, values.values.capacity()); } pub(super) fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) -> Result<()> { let additional = page.num_values(); @@ -253,7 +199,6 @@ pub(super) fn extend_from_page( values_buffer, additional, dict.as_any().downcast_ref().unwrap(), - offsets, values, validity, ) @@ -263,29 +208,18 @@ pub(super) fn extend_from_page( values_buffer, additional, dict.as_any().downcast_ref().unwrap(), - offsets, values, validity, ) } - (Encoding::DeltaLengthByteArray, None, true) => read_delta_optional::( - validity_buffer, - values_buffer, - additional, - offsets, - values, - validity, - ), - (Encoding::Plain, _, true) => read_plain_optional::( - validity_buffer, - values_buffer, - additional, - offsets, - values, - validity, - ), + (Encoding::DeltaLengthByteArray, None, true) => { + read_delta_optional::(validity_buffer, values_buffer, additional, values, validity) + } + (Encoding::Plain, _, true) => { + read_plain_optional::(validity_buffer, values_buffer, additional, values, validity) + } (Encoding::Plain, _, false) => { - read_plain_required::(page.buffer(), page.num_values(), offsets, values) + read_plain_required::(page.buffer(), page.num_values(), values) } _ => { return Err(utils::not_implemented( diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index 434036716a4..d13955f1782 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -1,91 +1,28 @@ use std::sync::Arc; use parquet2::{ - encoding::{hybrid_rle, Encoding}, + encoding::Encoding, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::{BinaryPageDict, DataPage}, FallibleStreamingIterator, }; -use super::super::utils as other_utils; +use super::{super::utils as other_utils, utils::Binary}; use crate::{ array::{ Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array, }, - bitmap::{utils::BitmapIter, MutableBitmap}, + bitmap::MutableBitmap, datatypes::DataType, error::{ArrowError, Result}, + io::parquet::read::utils::read_dict_optional, }; -#[allow(clippy::too_many_arguments)] -fn read_dict_optional( - validity_buffer: &[u8], - indices_buffer: &[u8], - additional: usize, - dict: &BinaryPageDict, - indices: &mut Vec, - offsets: &mut Vec, - values: &mut Vec, - validity: &mut MutableBitmap, -) where - K: DictionaryKey, - O: Offset, -{ - let length = indices.len() + additional; - values.extend_from_slice(dict.values()); - offsets.extend( - dict.offsets() - .iter() - .map(|x| O::from_usize(*x as usize).unwrap()), - ); - - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let mut new_indices = - hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - let remaining = length - indices.len(); - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - let value = if is_valid { - K::from_u32(new_indices.next().unwrap()).unwrap() - } else { - K::default() - }; - indices.push(value); - } - validity.extend_from_slice(packed, 0, len); - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let index = K::from_u32(new_indices.next().unwrap()).unwrap(); - indices.push(index) - }) - } else { - indices.resize(indices.len() + additional, *indices.last().unwrap()); - } - } - } - } -} - fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, indices: &mut Vec, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) -> Result<()> where @@ -101,14 +38,20 @@ where match (&page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + let dict = dict.as_any().downcast_ref::().unwrap(); + + values.values.extend_from_slice(dict.values()); + values.offsets.extend( + dict.offsets() + .iter() + .map(|x| O::from_usize(*x as usize).unwrap()), + ); + read_dict_optional( validity_buffer, values_buffer, additional, - dict.as_any().downcast_ref().unwrap(), indices, - offsets, - values, validity, ) } @@ -118,7 +61,7 @@ where is_optional, page.dictionary_page().is_some(), version, - "primitive", + "binary", )) } } @@ -138,38 +81,37 @@ where { let capacity = metadata.num_values() as usize; let mut indices = Vec::::with_capacity(capacity); - let mut values = Vec::::with_capacity(0); - let mut offsets = Vec::::with_capacity(1 + capacity); + let mut values = Binary::::with_capacity(capacity); + values.offsets.clear(); let mut validity = MutableBitmap::with_capacity(capacity); while let Some(page) = iter.next()? { extend_from_page( page, metadata.descriptor(), &mut indices, - &mut offsets, &mut values, &mut validity, )? } - if offsets.is_empty() { + if values.offsets.is_empty() { // the array is empty and thus we need to push the first offset ourselves. - offsets.push(O::zero()); + values.offsets.push(O::zero()); }; let keys = PrimitiveArray::from_data(K::PRIMITIVE.into(), indices.into(), validity.into()); let data_type = DictionaryArray::::get_child(&data_type).clone(); - use crate::datatypes::PhysicalType::*; + use crate::datatypes::PhysicalType; let values = match data_type.to_physical_type() { - Binary | LargeBinary => Arc::new(BinaryArray::from_data( + PhysicalType::Binary | PhysicalType::LargeBinary => Arc::new(BinaryArray::from_data( data_type, - offsets.into(), - values.into(), + values.offsets.into(), + values.values.into(), None, )) as Arc, - Utf8 | LargeUtf8 => Arc::new(Utf8Array::from_data( + PhysicalType::Utf8 | PhysicalType::LargeUtf8 => Arc::new(Utf8Array::from_data( data_type, - offsets.into(), - values.into(), + values.offsets.into(), + values.values.into(), None, )), _ => unreachable!(), diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 179c831aa29..a0422f1a92d 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -16,6 +16,8 @@ mod utils; pub use dictionary::iter_to_array as iter_to_dict_array; +use self::utils::Binary; + use super::nested_utils::Nested; pub fn iter_to_array( @@ -30,22 +32,14 @@ where I: FallibleStreamingIterator, { let capacity = metadata.num_values() as usize; - let mut values = Vec::::with_capacity(0); - let mut offsets = Vec::::with_capacity(1 + capacity); - offsets.push(O::default()); + let mut values = Binary::::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); let is_nullable = nested.pop().unwrap().is_nullable(); if nested.is_empty() { while let Some(page) = iter.next()? { - basic::extend_from_page( - page, - metadata.descriptor(), - &mut offsets, - &mut values, - &mut validity, - )? + basic::extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? } } else { while let Some(page) = iter.next()? { @@ -54,13 +48,12 @@ where metadata.descriptor(), is_nullable, nested, - &mut offsets, &mut values, &mut validity, )? } } - Ok(utils::finish_array(data_type, offsets, values, validity)) + Ok(utils::finish_array(data_type, values, validity)) } pub async fn stream_to_array( @@ -75,9 +68,7 @@ where I: Stream>, { let capacity = metadata.num_values() as usize; - let mut values = Vec::::with_capacity(0); - let mut offsets = Vec::::with_capacity(1 + capacity); - offsets.push(O::default()); + let mut values = Binary::::with_capacity(capacity); let mut validity = MutableBitmap::with_capacity(capacity); pin_mut!(pages); // needed for iteration @@ -86,11 +77,10 @@ where basic::extend_from_page( page.as_ref().map_err(|x| x.clone())?, metadata.descriptor(), - &mut offsets, &mut values, &mut validity, )? } - Ok(finish_array(data_type.clone(), offsets, values, validity)) + Ok(finish_array(data_type.clone(), values, validity)) } diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index 3cb9d1eeb58..b927e7fa617 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -5,9 +5,9 @@ use parquet2::{ read::levels::get_bit_width, }; -use super::super::nested_utils::*; use super::super::utils; use super::basic::read_plain_required; +use super::{super::nested_utils::*, utils::Binary}; use crate::{array::Offset, bitmap::MutableBitmap, error::Result}; @@ -15,8 +15,7 @@ fn read_values<'a, O, D, G>( def_levels: D, max_def: u32, mut new_values: G, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) where O: Offset, @@ -26,11 +25,10 @@ fn read_values<'a, O, D, G>( def_levels.for_each(|def| { if def == max_def { let v = new_values.next().unwrap(); - values.extend_from_slice(v); - offsets.push(*offsets.last().unwrap() + O::from_usize(v.len()).unwrap()); + values.push(v); validity.push(true); } else if def == max_def - 1 { - offsets.push(*offsets.last().unwrap()); + values.push(&[]); validity.push(false); } }); @@ -46,8 +44,7 @@ fn read( def_level_encoding: (&Encoding, i16), is_nullable: bool, nested: &mut Vec>, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) { let max_rep_level = rep_level_encoding.1 as u32; @@ -64,16 +61,9 @@ fn read( additional, ); let new_values = utils::BinaryIter::new(values_buffer); - read_values( - def_levels, - max_def_level, - new_values, - offsets, - values, - validity, - ) + read_values(def_levels, max_def_level, new_values, values, validity) } else { - read_plain_required(values_buffer, additional, offsets, values) + read_plain_required(values_buffer, additional, values) } let def_levels = @@ -97,8 +87,7 @@ pub(super) fn extend_from_page( descriptor: &ColumnDescriptor, is_nullable: bool, nested: &mut Vec>, - offsets: &mut Vec, - values: &mut Vec, + values: &mut Binary, validity: &mut MutableBitmap, ) -> Result<()> { let additional = page.num_values(); @@ -121,7 +110,6 @@ pub(super) fn extend_from_page( ), is_nullable, nested, - offsets, values, validity, ), diff --git a/src/io/parquet/read/binary/utils.rs b/src/io/parquet/read/binary/utils.rs index 4b1cea1f3ab..0554a38a118 100644 --- a/src/io/parquet/read/binary/utils.rs +++ b/src/io/parquet/read/binary/utils.rs @@ -2,27 +2,84 @@ use crate::{ array::{Array, BinaryArray, Offset, Utf8Array}, bitmap::MutableBitmap, datatypes::DataType, + io::parquet::read::utils::Pushable, }; pub(super) fn finish_array( data_type: DataType, - offsets: Vec, - values: Vec, + values: Binary, validity: MutableBitmap, ) -> Box { match data_type { DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( data_type, - offsets.into(), - values.into(), + values.offsets.into(), + values.values.into(), validity.into(), )), DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( data_type, - offsets.into(), - values.into(), + values.offsets.into(), + values.values.into(), validity.into(), )), _ => unreachable!(), } } + +/// [`Pushable`] for variable length binary data. +#[derive(Debug)] +pub struct Binary { + pub offsets: Vec, + pub values: Vec, + pub last_offset: O, +} + +impl Binary { + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + let mut offsets = Vec::with_capacity(1 + capacity); + offsets.push(O::default()); + Self { + offsets, + values: vec![], + last_offset: O::default(), + } + } + + #[inline] + pub fn push(&mut self, v: &[u8]) { + self.values.extend(v); + self.last_offset += O::from_usize(v.len()).unwrap(); + self.offsets.push(self.last_offset) + } + + #[inline] + pub fn extend_constant(&mut self, additional: usize) { + self.offsets + .resize(self.offsets.len() + additional, self.last_offset); + } + + #[inline] + pub fn len(&self) -> usize { + self.offsets.len() - 1 + } +} + +impl Pushable<&[u8]> for Binary { + #[inline] + fn reserve(&mut self, additional: usize) { + self.offsets.reserve(additional) + } + + #[inline] + fn push(&mut self, value: &[u8]) { + self.push(value) + } + + #[inline] + fn extend_constant(&mut self, additional: usize, value: &[u8]) { + assert_eq!(value.len(), 0); + self.extend_constant(additional) + } +} diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index e52f42594b1..dc1aeee7b66 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -3,6 +3,7 @@ use crate::{ bitmap::{utils::BitmapIter, MutableBitmap}, datatypes::DataType, error::{ArrowError, Result}, + io::parquet::read::utils::extend_from_decoder, }; use super::super::utils; @@ -26,45 +27,22 @@ fn read_optional( values: &mut MutableBitmap, validity: &mut MutableBitmap, ) { - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - // in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap. // note that `values_buffer` contains only non-null values. // thus, at this point, it is not known how many values this buffer contains // values_len is the upper bound. The actual number depends on how many nulls there is. let values_len = values_buffer.len() * 8; - let mut values_iterator = BitmapIter::new(values_buffer, 0, values_len); + let values_iterator = BitmapIter::new(values_buffer, 0, values_len); - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed_validity) => { - // the pack may contain more items than needed. - let remaining = length - values.len(); - let len = std::cmp::min(packed_validity.len() * 8, remaining); - for is_valid in BitmapIter::new(packed_validity, 0, len) { - let value = if is_valid { - values_iterator.next().unwrap() - } else { - false - }; - values.push(value); - } - validity.extend_from_slice(packed_validity, 0, len); - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let value = values_iterator.next().unwrap(); - values.push(value) - }) - } else { - values.extend_constant(additional, false) - } - } - } - } + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + + extend_from_decoder( + validity, + &mut validity_iterator, + length, + values, + values_iterator, + ) } pub async fn stream_to_array(pages: I, metadata: &ColumnChunkMetaData) -> Result diff --git a/src/io/parquet/read/fixed_size_binary.rs b/src/io/parquet/read/fixed_size_binary.rs deleted file mode 100644 index 39c850e8d49..00000000000 --- a/src/io/parquet/read/fixed_size_binary.rs +++ /dev/null @@ -1,269 +0,0 @@ -use futures::{pin_mut, Stream, StreamExt}; -use parquet2::{ - encoding::{hybrid_rle, Encoding}, - page::{DataPage, FixedLenByteArrayPageDict}, - FallibleStreamingIterator, -}; - -use super::{ColumnChunkMetaData, ColumnDescriptor}; -use crate::{ - array::FixedSizeBinaryArray, - bitmap::{utils::BitmapIter, MutableBitmap}, - datatypes::DataType, - error::{ArrowError, Result}, -}; - -use super::utils; - -/// Assumptions: No rep levels -#[allow(clippy::too_many_arguments)] -pub(crate) fn read_dict_buffer( - validity_buffer: &[u8], - indices_buffer: &[u8], - additional: usize, - size: usize, - dict: &FixedLenByteArrayPageDict, - values: &mut Vec, - validity: &mut MutableBitmap, -) { - let length = values.len() + additional * size; - let dict_values = dict.values(); - - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let mut indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, length); - - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - let remaining = (length - values.len()) / size; - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - validity.push(is_valid); - if is_valid { - let index = indices.next().unwrap() as usize; - values.extend_from_slice(&dict_values[index * size..(index + 1) * size]); - } else { - values.resize(values.len() + size, 0); - } - } - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let index = indices.next().unwrap() as usize; - values.extend_from_slice(&dict_values[index * size..(index + 1) * size]); - }) - } else { - values.resize(values.len() + additional * size, 0); - } - } - } - } -} - -/// Assumptions: No rep levels -pub(crate) fn read_dict_required( - indices_buffer: &[u8], - additional: usize, - size: usize, - dict: &FixedLenByteArrayPageDict, - values: &mut Vec, - validity: &mut MutableBitmap, -) { - let dict_values = dict.values(); - - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - - for index in indices { - let index = index as usize; - values.extend_from_slice(&dict_values[index * size..(index + 1) * size]); - } - validity.extend_constant(additional * size, true); -} - -pub(crate) fn read_optional( - validity_buffer: &[u8], - values_buffer: &[u8], - additional: usize, - size: usize, - values: &mut Vec, - validity: &mut MutableBitmap, -) { - let length = values.len() + additional * size; - - assert_eq!(values_buffer.len() % size, 0); - let mut values_iterator = values_buffer.chunks_exact(size); - - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - // the pack may contain more items than needed. - let remaining = (length - values.len()) / size; - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - validity.push(is_valid); - if is_valid { - let value = values_iterator.next().unwrap(); - values.extend_from_slice(value); - } else { - values.resize(values.len() + size, 0); - } - } - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let value = values_iterator.next().unwrap(); - values.extend_from_slice(value) - }) - } else { - values.resize(values.len() + additional * size, 0); - } - } - } - } -} - -pub(crate) fn read_required(buffer: &[u8], additional: usize, size: usize, values: &mut Vec) { - assert_eq!(buffer.len(), additional * size); - values.extend_from_slice(buffer); -} - -pub fn iter_to_array( - mut iter: I, - data_type: DataType, - metadata: &ColumnChunkMetaData, -) -> Result -where - ArrowError: From, - I: FallibleStreamingIterator, -{ - let size = FixedSizeBinaryArray::get_size(&data_type); - - let capacity = metadata.num_values() as usize; - let mut values = Vec::::with_capacity(capacity * size); - let mut validity = MutableBitmap::with_capacity(capacity); - while let Some(page) = iter.next()? { - extend_from_page( - page, - size, - metadata.descriptor(), - &mut values, - &mut validity, - )? - } - - Ok(FixedSizeBinaryArray::from_data( - data_type, - values.into(), - validity.into(), - )) -} - -pub async fn stream_to_array( - pages: I, - data_type: DataType, - metadata: &ColumnChunkMetaData, -) -> Result -where - ArrowError: From, - E: Clone, - I: Stream>, -{ - let size = FixedSizeBinaryArray::get_size(&data_type); - - let capacity = metadata.num_values() as usize; - let mut values = Vec::::with_capacity(capacity * size); - let mut validity = MutableBitmap::with_capacity(capacity); - - pin_mut!(pages); // needed for iteration - - while let Some(page) = pages.next().await { - extend_from_page( - page.as_ref().map_err(|x| x.clone())?, - size, - metadata.descriptor(), - &mut values, - &mut validity, - )? - } - - Ok(FixedSizeBinaryArray::from_data( - data_type, - values.into(), - validity.into(), - )) -} - -pub(crate) fn extend_from_page( - page: &DataPage, - size: usize, - descriptor: &ColumnDescriptor, - values: &mut Vec, - validity: &mut MutableBitmap, -) -> Result<()> { - let additional = page.num_values(); - assert_eq!(descriptor.max_rep_level(), 0); - assert!(descriptor.max_def_level() <= 1); - let is_optional = descriptor.max_def_level() == 1; - - let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor); - - match (page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer( - validity_buffer, - values_buffer, - additional, - size, - dict.as_any().downcast_ref().unwrap(), - values, - validity, - ), - (Encoding::PlainDictionary, Some(dict), false) => read_dict_required( - values_buffer, - additional, - size, - dict.as_any().downcast_ref().unwrap(), - values, - validity, - ), - (Encoding::Plain, _, true) => read_optional( - validity_buffer, - values_buffer, - additional, - size, - values, - validity, - ), - // it can happen that there is a dictionary but the encoding is plain because - // it falled back. - (Encoding::Plain, _, false) => read_required(page.buffer(), additional, size, values), - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_optional, - page.dictionary_page().is_some(), - version, - "FixedSizeBinary", - )) - } - } - Ok(()) -} diff --git a/src/io/parquet/read/fixed_size_binary/mod.rs b/src/io/parquet/read/fixed_size_binary/mod.rs new file mode 100644 index 00000000000..2e1a93f991a --- /dev/null +++ b/src/io/parquet/read/fixed_size_binary/mod.rs @@ -0,0 +1,213 @@ +mod utils; + +use futures::{pin_mut, Stream, StreamExt}; +use parquet2::{ + encoding::{hybrid_rle, Encoding}, + page::{DataPage, FixedLenByteArrayPageDict}, + FallibleStreamingIterator, +}; + +use self::utils::FixedSizeBinary; + +use super::{utils::extend_from_decoder, ColumnChunkMetaData, ColumnDescriptor}; +use crate::{ + array::FixedSizeBinaryArray, + bitmap::MutableBitmap, + datatypes::DataType, + error::{ArrowError, Result}, +}; + +use super::utils as a_utils; + +#[inline] +fn values_iter<'a>( + indices_buffer: &'a [u8], + dict_values: &'a [u8], + size: usize, + additional: usize, +) -> impl Iterator + 'a { + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); + indices.map(move |index| { + let index = index as usize; + &dict_values[index * size..(index + 1) * size] + }) +} + +/// Assumptions: No rep levels +#[allow(clippy::too_many_arguments)] +pub(crate) fn read_dict_buffer( + validity_buffer: &[u8], + indices_buffer: &[u8], + additional: usize, + dict: &FixedLenByteArrayPageDict, + values: &mut FixedSizeBinary, + validity: &mut MutableBitmap, +) { + let values_iterator = values_iter(indices_buffer, dict.values(), values.size, additional); + + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + + extend_from_decoder( + validity, + &mut validity_iterator, + additional, + values, + values_iterator, + ) +} + +/// Assumptions: No rep levels +pub(crate) fn read_dict_required( + indices_buffer: &[u8], + additional: usize, + dict: &FixedLenByteArrayPageDict, + values: &mut FixedSizeBinary, + validity: &mut MutableBitmap, +) { + let size = values.size; + + let values_iter = values_iter(indices_buffer, dict.values(), values.size, additional); + + for value in values_iter { + values.push(value); + } + validity.extend_constant(additional * size, true); +} + +pub(crate) fn read_optional( + validity_buffer: &[u8], + values_buffer: &[u8], + additional: usize, + values: &mut FixedSizeBinary, + validity: &mut MutableBitmap, +) { + assert_eq!(values_buffer.len() % values.size, 0); + let values_iterator = values_buffer.chunks_exact(values.size); + + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + + extend_from_decoder( + validity, + &mut validity_iterator, + additional, + values, + values_iterator, + ) +} + +pub(crate) fn read_required(buffer: &[u8], additional: usize, values: &mut FixedSizeBinary) { + assert_eq!(buffer.len(), additional * values.size); + values.values.extend_from_slice(buffer); +} + +pub fn iter_to_array( + mut iter: I, + data_type: DataType, + metadata: &ColumnChunkMetaData, +) -> Result +where + ArrowError: From, + I: FallibleStreamingIterator, +{ + let size = FixedSizeBinaryArray::get_size(&data_type); + + let capacity = metadata.num_values() as usize; + let mut values = FixedSizeBinary::with_capacity(capacity, size); + let mut validity = MutableBitmap::with_capacity(capacity); + while let Some(page) = iter.next()? { + extend_from_page(page, metadata.descriptor(), &mut values, &mut validity)? + } + + Ok(FixedSizeBinaryArray::from_data( + data_type, + values.values.into(), + validity.into(), + )) +} + +pub async fn stream_to_array( + pages: I, + data_type: DataType, + metadata: &ColumnChunkMetaData, +) -> Result +where + ArrowError: From, + E: Clone, + I: Stream>, +{ + let size = FixedSizeBinaryArray::get_size(&data_type); + + let capacity = metadata.num_values() as usize; + let mut values = FixedSizeBinary::with_capacity(capacity, size); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut values, + &mut validity, + )? + } + + Ok(FixedSizeBinaryArray::from_data( + data_type, + values.values.into(), + validity.into(), + )) +} + +pub(crate) fn extend_from_page( + page: &DataPage, + descriptor: &ColumnDescriptor, + values: &mut FixedSizeBinary, + validity: &mut MutableBitmap, +) -> Result<()> { + let additional = page.num_values(); + assert_eq!(descriptor.max_rep_level(), 0); + assert!(descriptor.max_def_level() <= 1); + let is_optional = descriptor.max_def_level() == 1; + + let (_, validity_buffer, values_buffer, version) = a_utils::split_buffer(page, descriptor); + + match (page.encoding(), page.dictionary_page(), is_optional) { + (Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer( + validity_buffer, + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + values, + validity, + ), + (Encoding::PlainDictionary, Some(dict), false) => read_dict_required( + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + values, + validity, + ), + (Encoding::Plain, _, true) => { + read_optional(validity_buffer, values_buffer, additional, values, validity) + } + // it can happen that there is a dictionary but the encoding is plain because + // it falled back. + (Encoding::Plain, _, false) => read_required(page.buffer(), additional, values), + _ => { + return Err(a_utils::not_implemented( + &page.encoding(), + is_optional, + page.dictionary_page().is_some(), + version, + "FixedSizeBinary", + )) + } + } + Ok(()) +} diff --git a/src/io/parquet/read/fixed_size_binary/utils.rs b/src/io/parquet/read/fixed_size_binary/utils.rs new file mode 100644 index 00000000000..1e35aaba0d1 --- /dev/null +++ b/src/io/parquet/read/fixed_size_binary/utils.rs @@ -0,0 +1,52 @@ +use crate::io::parquet::read::utils::Pushable; + +/// A [`Pushable`] for fixed sized binary data +#[derive(Debug)] +pub struct FixedSizeBinary { + pub values: Vec, + pub size: usize, +} + +impl FixedSizeBinary { + #[inline] + pub fn with_capacity(capacity: usize, size: usize) -> Self { + Self { + values: Vec::with_capacity(capacity * size), + size, + } + } + + #[inline] + pub fn push(&mut self, value: &[u8]) { + self.values.extend(value); + } + + #[inline] + pub fn extend_constant(&mut self, additional: usize) { + self.values + .resize(self.values.len() + additional * self.size, 0); + } +} + +impl Pushable<&[u8]> for FixedSizeBinary { + #[inline] + fn reserve(&mut self, additional: usize) { + self.values.reserve(additional * self.size) + } + + #[inline] + fn push(&mut self, value: &[u8]) { + self.values.extend(value); + } + + #[inline] + fn push_null(&mut self) { + self.values.extend(std::iter::repeat(0).take(self.size)) + } + + #[inline] + fn extend_constant(&mut self, additional: usize, value: &[u8]) { + assert_eq!(value.len(), 0); + self.extend_constant(additional) + } +} diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index e661e0173cc..7cebf343b10 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -8,11 +8,31 @@ use super::super::utils as other_utils; use super::utils::chunks; use super::ColumnDescriptor; use crate::{ - bitmap::{utils::BitmapIter, MutableBitmap}, - error::Result, + bitmap::MutableBitmap, error::Result, io::parquet::read::utils::extend_from_decoder, types::NativeType as ArrowNativeType, }; +#[inline] +fn values_iter<'a, T, A, F>( + indices_buffer: &'a [u8], + dict_values: &'a [T], + additional: usize, + op: F, +) -> impl Iterator + 'a +where + T: NativeType, + A: ArrowNativeType, + F: 'a + Fn(T) -> A, +{ + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); + indices.map(move |index| op(dict_values[index as usize])) +} + fn read_dict_buffer_optional( validity_buffer: &[u8], indices_buffer: &[u8], @@ -27,48 +47,18 @@ fn read_dict_buffer_optional( F: Fn(T) -> A, { let length = additional + values.len(); - let dict_values = dict.values(); - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; + let values_iterator = values_iter(indices_buffer, dict.values(), additional, op); - let mut indices = - hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - let remaining = length - values.len(); - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - let value = if is_valid { - op(dict_values[indices.next().unwrap() as usize]) - } else { - A::default() - }; - values.push(value); - } - validity.extend_from_slice(packed, 0, len); - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let index = indices.next().unwrap() as usize; - let value = op(dict_values[index]); - values.push(value) - }) - } else { - values.resize(values.len() + additional, A::default()); - } - } - } - } + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + + extend_from_decoder( + validity, + &mut validity_iterator, + length, + values, + values_iterator, + ) } fn read_dict_buffer_required( @@ -83,16 +73,9 @@ fn read_dict_buffer_required( A: ArrowNativeType, F: Fn(T) -> A, { - let dict_values = dict.values(); - - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); + let values_iterator = values_iter(indices_buffer, dict.values(), additional, op); - values.extend(indices.map(|index| op(dict_values[index as usize]))); + values.extend(values_iterator); validity.extend_constant(additional, true); } @@ -109,41 +92,17 @@ fn read_nullable( A: ArrowNativeType, F: Fn(T) -> A, { - let length = additional + values.len(); - let mut chunks = chunks(values_buffer); - - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - // the pack may contain more items than needed. - let remaining = length - values.len(); - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - let value = if is_valid { - op(chunks.next().unwrap()) - } else { - A::default() - }; - values.push(value); - } - validity.extend_from_slice(packed, 0, len); - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let value = op(chunks.next().unwrap()); - values.push(value) - }) - } else { - values.resize(values.len() + additional, A::default()); - } - } - } - } + let values_iterator = chunks(values_buffer).map(op); + + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + + extend_from_decoder( + validity, + &mut validity_iterator, + additional, + values, + values_iterator, + ) } fn read_required(values_buffer: &[u8], additional: usize, values: &mut Vec, op: F) diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs index 80c8c123e01..5c570588caf 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use parquet2::{ - encoding::{hybrid_rle, Encoding}, + encoding::Encoding, page::{DataPage, PrimitivePageDict}, types::NativeType, FallibleStreamingIterator, @@ -11,72 +11,13 @@ use super::super::utils; use super::{ColumnChunkMetaData, ColumnDescriptor}; use crate::{ array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}, - bitmap::{utils::BitmapIter, MutableBitmap}, + bitmap::MutableBitmap, datatypes::DataType, error::{ArrowError, Result}, + io::parquet::read::utils::read_dict_optional, types::NativeType as ArrowNativeType, }; -#[allow(clippy::too_many_arguments)] -fn read_dict_optional( - validity_buffer: &[u8], - indices_buffer: &[u8], - additional: usize, - dict: &PrimitivePageDict, - indices: &mut Vec, - values: &mut Vec, - validity: &mut MutableBitmap, - op: F, -) where - K: DictionaryKey, - T: NativeType, - A: ArrowNativeType, - F: Fn(T) -> A, -{ - let dict_values = dict.values(); - values.extend(dict_values.iter().map(|x| op(*x))); - - // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), - // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). - let bit_width = indices_buffer[0]; - let indices_buffer = &indices_buffer[1..]; - - let mut new_indices = - hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); - - let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); - - for run in validity_iterator { - match run { - hybrid_rle::HybridEncoded::Bitpacked(packed) => { - let remaining = additional - indices.len(); - let len = std::cmp::min(packed.len() * 8, remaining); - for is_valid in BitmapIter::new(packed, 0, len) { - let value = if is_valid { - K::from_u32(new_indices.next().unwrap()).unwrap() - } else { - K::default() - }; - indices.push(value); - } - validity.extend_from_slice(packed, 0, len); - } - hybrid_rle::HybridEncoded::Rle(value, additional) => { - let is_set = value[0] == 1; - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| { - let index = K::from_u32(new_indices.next().unwrap()).unwrap(); - indices.push(index) - }) - } else { - values.resize(values.len() + additional, A::default()); - } - } - } - } -} - fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, @@ -100,15 +41,17 @@ where match (&page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + let dict = dict + .as_any() + .downcast_ref::>() + .unwrap(); + values.extend(dict.values().iter().map(|x| op(*x))); read_dict_optional( validity_buffer, values_buffer, additional, - dict.as_any().downcast_ref().unwrap(), indices, - values, validity, - op, ) } _ => { diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index 85b40a95a6b..25aa171cbc3 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -1,9 +1,12 @@ use std::convert::TryInto; -use parquet2::encoding::Encoding; +use parquet2::encoding::{hybrid_rle, Encoding}; use parquet2::metadata::ColumnDescriptor; use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader}; +use crate::array::DictionaryKey; +use crate::bitmap::utils::BitmapIter; +use crate::bitmap::MutableBitmap; use crate::error::ArrowError; pub struct BinaryIter<'a> { @@ -59,3 +62,129 @@ pub fn split_buffer<'a>( }; (rep_levels, validity_buffer, values_buffer, version) } + +/// A private trait representing structs that can receive elements. +pub(super) trait Pushable { + fn reserve(&mut self, additional: usize); + fn push(&mut self, value: T); + #[inline] + fn push_null(&mut self) { + self.push(T::default()) + } + fn extend_constant(&mut self, additional: usize, value: T); +} + +impl Pushable for MutableBitmap { + #[inline] + fn reserve(&mut self, additional: usize) { + self.reserve(additional) + } + + #[inline] + fn push(&mut self, value: bool) { + self.push(value) + } + + #[inline] + fn extend_constant(&mut self, additional: usize, value: bool) { + self.extend_constant(additional, value) + } +} + +impl Pushable for Vec { + #[inline] + fn reserve(&mut self, additional: usize) { + self.reserve(additional) + } + + #[inline] + fn push(&mut self, value: A) { + self.push(value) + } + + #[inline] + fn extend_constant(&mut self, additional: usize, value: A) { + self.resize(self.len() + additional, value); + } +} + +/// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder +#[inline] +pub(super) fn extend_from_decoder<'a, T: Default, C: Pushable, I: Iterator>( + validity: &mut MutableBitmap, + decoder: &mut hybrid_rle::Decoder<'a>, + page_length: usize, // data page length + values: &mut C, + mut values_iter: I, +) { + let remaining = page_length; + for run in decoder { + match run { + hybrid_rle::HybridEncoded::Bitpacked(pack) => { + // compute the length of the pack + let pack_size = pack.len() * 8; + let pack_remaining = page_length; + let length = std::cmp::min(pack_size, pack_remaining); + + let additional = remaining.min(length); + + // extend validity + validity.extend_from_slice(pack, 0, additional); + + // extend values + let iter = BitmapIter::new(pack, 0, additional); + for is_valid in iter { + if is_valid { + values.push(values_iter.next().unwrap()) + } else { + values.push_null() + }; + } + } + hybrid_rle::HybridEncoded::Rle(value, length) => { + let is_set = value[0] == 1; + + // extend validity + let length = length; + let additional = remaining.min(length); + validity.extend_constant(additional, is_set); + + // extend values + if is_set { + (0..additional).for_each(|_| values.push(values_iter.next().unwrap())); + } else { + values.extend_constant(additional, T::default()); + } + } + } + } +} + +pub(super) fn read_dict_optional( + validity_buffer: &[u8], + indices_buffer: &[u8], + additional: usize, + indices: &mut Vec, + validity: &mut MutableBitmap, +) where + K: DictionaryKey, +{ + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + let new_indices = + hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); + let indices_iter = new_indices.map(|x| K::from_u32(x).unwrap()); + + let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + + extend_from_decoder( + validity, + &mut validity_iterator, + additional, + indices, + indices_iter, + ) +}