Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Arrow take kernel within ListArrayReader #1490

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 13 additions & 227 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,17 @@ use std::sync::Arc;
use std::vec::Vec;

use arrow::array::{
new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalArray,
FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder,
Int32Array, Int64Array, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray,
PrimitiveBuilder, StringArray, StringBuilder, StructArray,
new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray,
BooleanBufferBuilder, DecimalArray, GenericListArray, Int16BufferBuilder, Int32Array,
Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, StructArray, UInt32Array,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::compute::take;
use arrow::datatypes::{
ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type,
DurationMicrosecondType as ArrowDurationMicrosecondType,
DurationMillisecondType as ArrowDurationMillisecondType,
DurationNanosecondType as ArrowDurationNanosecondType,
DurationSecondType as ArrowDurationSecondType, Float32Type as ArrowFloat32Type,
Float64Type as ArrowFloat64Type, Int16Type as ArrowInt16Type,
Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type,
Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
TimestampMicrosecondType as ArrowTimestampMicrosecondType,
TimestampMillisecondType as ArrowTimestampMillisecondType,
TimestampNanosecondType as ArrowTimestampNanosecondType,
TimestampSecondType as ArrowTimestampSecondType, ToByteSlice,
UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type,
UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type,
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, ToByteSlice,
UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type,
};
use arrow::util::bit_util;

Expand Down Expand Up @@ -695,195 +680,6 @@ impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
}
}

macro_rules! remove_primitive_array_indices {
($arr: expr, $item_type:ty, $indices:expr) => {{
let array_data = match $arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
Some(a) => a,
_ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
};
let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
for i in 0..array_data.len() {
if !$indices.contains(&i) {
if array_data.is_null(i) {
builder.append_null()?;
} else {
builder.append_value(array_data.value(i))?;
}
}
}
Ok(Arc::new(builder.finish()))
}};
}

macro_rules! remove_array_indices_custom_builder {
($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
Some(a) => a,
_ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
};
let mut builder = $item_builder::new(array_data.len());

for i in 0..array_data.len() {
if !$indices.contains(&i) {
if array_data.is_null(i) {
builder.append_null()?;
} else {
builder.append_value(array_data.value(i))?;
}
}
}
Ok(Arc::new(builder.finish()))
}};
}

macro_rules! remove_fixed_size_binary_array_indices {
($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{
let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
Some(a) => a,
_ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
};
let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
for i in 0..array_data.len() {
if !$indices.contains(&i) {
if array_data.is_null(i) {
builder.append_null()?;
} else {
builder.append_value(array_data.value(i))?;
}
}
}
Ok(Arc::new(builder.finish()))
}};
}

fn remove_indices(
arr: ArrayRef,
item_type: ArrowType,
indices: Vec<usize>,
) -> Result<ArrayRef> {
match item_type {
ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices),
ArrowType::UInt16 => {
remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
}
ArrowType::UInt32 => {
remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
}
ArrowType::UInt64 => {
remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
}
ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices),
ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices),
ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices),
ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices),
ArrowType::Float32 => {
remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
}
ArrowType::Float64 => {
remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
}
ArrowType::Boolean => {
remove_array_indices_custom_builder!(
arr,
BooleanArray,
BooleanBuilder,
indices
)
}
ArrowType::Date32 => {
remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
}
ArrowType::Date64 => {
remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
}
ArrowType::Time32(ArrowTimeUnit::Second) => {
remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices)
}
ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices)
}
ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices)
}
ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices)
}
ArrowType::Duration(ArrowTimeUnit::Second) => {
remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices)
}
ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices)
}
ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices)
}
ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices)
}
ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices)
}
ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices)
}
ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices)
}
ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices)
}
ArrowType::Utf8 => {
remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices)
}
ArrowType::Binary => {
remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices)
}
ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!(
arr,
FixedSizeBinaryArray,
FixedSizeBinaryBuilder,
indices,
size
),
ArrowType::Struct(fields) => {
let struct_array = arr
.as_any()
.downcast_ref::<StructArray>()
.expect("Array should be a struct");

// Recursively call remove indices on each of the structs fields
let new_columns = fields
.into_iter()
.zip(struct_array.columns())
.map(|(field, column)| {
let dt = field.data_type().clone();
Ok((field, remove_indices(column.clone(), dt, indices.clone())?))
})
.collect::<Result<Vec<_>>>()?;

if arr.data().null_count() == 0 {
// No nulls, nothing to do.
Ok(Arc::new(StructArray::from(new_columns)))
} else {
// Construct a new validity buffer by removing `indices` from the original validity
// map.
let mut valid = BooleanBufferBuilder::new(arr.len() - indices.len());
for idx in 0..arr.len() {
if !indices.contains(&idx) {
valid.append(!arr.is_null(idx));
}
}
Ok(Arc::new(StructArray::from((new_columns, valid.finish()))))
}
}
ArrowType::Null => Ok(Arc::new(NullArray::new(arr.len() - indices.len()))),
_ => Err(ParquetError::General(format!(
"ListArray of type List({:?}) is not supported by array_reader",
item_type
))),
}
}

/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported.
impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
fn as_any(&self) -> &dyn Any {
Expand All @@ -898,7 +694,6 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {

fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
let next_batch_array = self.item_reader.next_batch(batch_size)?;
let item_type = self.item_reader.get_data_type().clone();

if next_batch_array.len() == 0 {
return Ok(new_empty_array(&self.data_type));
Expand Down Expand Up @@ -929,21 +724,12 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
// If a Parquet schema's only leaf is the list, then n = 0.

// If the list index is at empty definition, the child slot is null
let null_list_indices: Vec<usize> = def_levels
.iter()
.enumerate()
.filter_map(|(index, def)| {
if *def <= self.list_empty_def_level {
Some(index)
} else {
None
}
})
.collect();
let batch_values = match null_list_indices.len() {
0 => next_batch_array.clone(),
_ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?,
};
let non_null_list_indices =
def_levels.iter().enumerate().filter_map(|(index, def)| {
(*def > self.list_empty_def_level).then(|| index as u32)
});
let indices = UInt32Array::from_iter_values(non_null_list_indices);
let batch_values = take(&*next_batch_array.clone(), &indices, None)?;

// first item in each list has rep_level = 0, subsequent items have rep_level = 1
let mut offsets: Vec<OffsetSize> = Vec::new();
Expand Down