diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index 756dffcc494d..a6f079d0627b 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -24,32 +24,17 @@ use std::sync::Arc; use std::vec::Vec; use arrow::array::{ - new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, - BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalArray, - FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, - Int32Array, Int64Array, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, - PrimitiveBuilder, StringArray, StringBuilder, StructArray, + new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, + BooleanBufferBuilder, DecimalArray, GenericListArray, Int16BufferBuilder, Int32Array, + Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, StructArray, UInt32Array, }; use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::compute::take; use arrow::datatypes::{ ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType, - Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, - DurationMicrosecondType as ArrowDurationMicrosecondType, - DurationMillisecondType as ArrowDurationMillisecondType, - DurationNanosecondType as ArrowDurationNanosecondType, - DurationSecondType as ArrowDurationSecondType, Float32Type as ArrowFloat32Type, - Float64Type as ArrowFloat64Type, Int16Type as ArrowInt16Type, - Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, - Time32MillisecondType as ArrowTime32MillisecondType, - Time32SecondType as ArrowTime32SecondType, - Time64MicrosecondType as ArrowTime64MicrosecondType, - Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit, - TimestampMicrosecondType as ArrowTimestampMicrosecondType, - TimestampMillisecondType as ArrowTimestampMillisecondType, - TimestampNanosecondType as ArrowTimestampNanosecondType, - TimestampSecondType as ArrowTimestampSecondType, ToByteSlice, - UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type, - UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type, + Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, + Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, ToByteSlice, + UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type, }; use arrow::util::bit_util; @@ -695,195 +680,6 @@ impl ListArrayReader { } } -macro_rules! remove_primitive_array_indices { - ($arr: expr, $item_type:ty, $indices:expr) => {{ - let array_data = match $arr.as_any().downcast_ref::>() { - Some(a) => a, - _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), - }; - let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len()); - for i in 0..array_data.len() { - if !$indices.contains(&i) { - if array_data.is_null(i) { - builder.append_null()?; - } else { - builder.append_value(array_data.value(i))?; - } - } - } - Ok(Arc::new(builder.finish())) - }}; -} - -macro_rules! remove_array_indices_custom_builder { - ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ - let array_data = match $arr.as_any().downcast_ref::<$array_type>() { - Some(a) => a, - _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), - }; - let mut builder = $item_builder::new(array_data.len()); - - for i in 0..array_data.len() { - if !$indices.contains(&i) { - if array_data.is_null(i) { - builder.append_null()?; - } else { - builder.append_value(array_data.value(i))?; - } - } - } - Ok(Arc::new(builder.finish())) - }}; -} - -macro_rules! remove_fixed_size_binary_array_indices { - ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{ - let array_data = match $arr.as_any().downcast_ref::<$array_type>() { - Some(a) => a, - _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), - }; - let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len); - for i in 0..array_data.len() { - if !$indices.contains(&i) { - if array_data.is_null(i) { - builder.append_null()?; - } else { - builder.append_value(array_data.value(i))?; - } - } - } - Ok(Arc::new(builder.finish())) - }}; -} - -fn remove_indices( - arr: ArrayRef, - item_type: ArrowType, - indices: Vec, -) -> Result { - match item_type { - ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices), - ArrowType::UInt16 => { - remove_primitive_array_indices!(arr, ArrowUInt16Type, indices) - } - ArrowType::UInt32 => { - remove_primitive_array_indices!(arr, ArrowUInt32Type, indices) - } - ArrowType::UInt64 => { - remove_primitive_array_indices!(arr, ArrowUInt64Type, indices) - } - ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices), - ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices), - ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices), - ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices), - ArrowType::Float32 => { - remove_primitive_array_indices!(arr, ArrowFloat32Type, indices) - } - ArrowType::Float64 => { - remove_primitive_array_indices!(arr, ArrowFloat64Type, indices) - } - ArrowType::Boolean => { - remove_array_indices_custom_builder!( - arr, - BooleanArray, - BooleanBuilder, - indices - ) - } - ArrowType::Date32 => { - remove_primitive_array_indices!(arr, ArrowDate32Type, indices) - } - ArrowType::Date64 => { - remove_primitive_array_indices!(arr, ArrowDate64Type, indices) - } - ArrowType::Time32(ArrowTimeUnit::Second) => { - remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices) - } - ArrowType::Time32(ArrowTimeUnit::Millisecond) => { - remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices) - } - ArrowType::Time64(ArrowTimeUnit::Microsecond) => { - remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices) - } - ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { - remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices) - } - ArrowType::Duration(ArrowTimeUnit::Second) => { - remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices) - } - ArrowType::Duration(ArrowTimeUnit::Millisecond) => { - remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices) - } - ArrowType::Duration(ArrowTimeUnit::Microsecond) => { - remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices) - } - ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { - remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) - } - ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { - remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices) - } - ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { - remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices) - } - ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { - remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices) - } - ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { - remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices) - } - ArrowType::Utf8 => { - remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices) - } - ArrowType::Binary => { - remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices) - } - ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( - arr, - FixedSizeBinaryArray, - FixedSizeBinaryBuilder, - indices, - size - ), - ArrowType::Struct(fields) => { - let struct_array = arr - .as_any() - .downcast_ref::() - .expect("Array should be a struct"); - - // Recursively call remove indices on each of the structs fields - let new_columns = fields - .into_iter() - .zip(struct_array.columns()) - .map(|(field, column)| { - let dt = field.data_type().clone(); - Ok((field, remove_indices(column.clone(), dt, indices.clone())?)) - }) - .collect::>>()?; - - if arr.data().null_count() == 0 { - // No nulls, nothing to do. - Ok(Arc::new(StructArray::from(new_columns))) - } else { - // Construct a new validity buffer by removing `indices` from the original validity - // map. - let mut valid = BooleanBufferBuilder::new(arr.len() - indices.len()); - for idx in 0..arr.len() { - if !indices.contains(&idx) { - valid.append(!arr.is_null(idx)); - } - } - Ok(Arc::new(StructArray::from((new_columns, valid.finish())))) - } - } - ArrowType::Null => Ok(Arc::new(NullArray::new(arr.len() - indices.len()))), - _ => Err(ParquetError::General(format!( - "ListArray of type List({:?}) is not supported by array_reader", - item_type - ))), - } -} - /// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported. impl ArrayReader for ListArrayReader { fn as_any(&self) -> &dyn Any { @@ -898,7 +694,6 @@ impl ArrayReader for ListArrayReader { fn next_batch(&mut self, batch_size: usize) -> Result { let next_batch_array = self.item_reader.next_batch(batch_size)?; - let item_type = self.item_reader.get_data_type().clone(); if next_batch_array.len() == 0 { return Ok(new_empty_array(&self.data_type)); @@ -929,21 +724,12 @@ impl ArrayReader for ListArrayReader { // If a Parquet schema's only leaf is the list, then n = 0. // If the list index is at empty definition, the child slot is null - let null_list_indices: Vec = def_levels - .iter() - .enumerate() - .filter_map(|(index, def)| { - if *def <= self.list_empty_def_level { - Some(index) - } else { - None - } - }) - .collect(); - let batch_values = match null_list_indices.len() { - 0 => next_batch_array.clone(), - _ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?, - }; + let non_null_list_indices = + def_levels.iter().enumerate().filter_map(|(index, def)| { + (*def > self.list_empty_def_level).then(|| index as u32) + }); + let indices = UInt32Array::from_iter_values(non_null_list_indices); + let batch_values = take(&*next_batch_array.clone(), &indices, None)?; // first item in each list has rep_level = 0, subsequent items have rep_level = 1 let mut offsets: Vec = Vec::new();