diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index f4139b2f654d..1d8441cbd7f3 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -717,13 +717,7 @@ impl ArrayReader for StructArrayReader { .children .iter_mut() .map(|reader| reader.next_batch(batch_size)) - .try_fold( - Vec::new(), - |mut result, child_array| -> Result> { - result.push(child_array?); - Ok(result) - }, - )?; + .collect::>>()?; // check that array child data has same size let children_array_len = @@ -1538,7 +1532,7 @@ mod tests { ArrowType::Int32, array_1.clone(), Some(vec![0, 1, 2, 3, 1]), - Some(vec![1, 1, 1, 1, 1]), + Some(vec![0, 1, 1, 1, 1]), ); let array_2 = Arc::new(PrimitiveArray::::from(vec![5, 4, 3, 2, 1])); @@ -1546,7 +1540,7 @@ mod tests { ArrowType::Int32, array_2.clone(), Some(vec![0, 1, 3, 1, 2]), - Some(vec![1, 1, 1, 1, 1]), + Some(vec![0, 1, 1, 1, 1]), ); let struct_type = ArrowType::Struct(vec![ @@ -1576,7 +1570,7 @@ mod tests { struct_array_reader.get_def_levels() ); assert_eq!( - Some(vec![1, 1, 1, 1, 1].as_slice()), + Some(vec![0, 1, 1, 1, 1].as_slice()), struct_array_reader.get_rep_levels() ); } diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 2836c699c39e..af2896350cad 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -38,7 +38,7 @@ use crate::data_type::{ Int96Type, }; use crate::errors::ParquetError::ArrowError; -use crate::errors::{Result}; +use crate::errors::{ParquetError, Result}; use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr}; use crate::schema::visitor::TypeVisitor; @@ -129,9 +129,10 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext let null_mask_only = match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; - false + return Err(ArrowError(format!( + "Reading repeated primitive ({:?}) is not supported yet!", + cur_type.name() + ))); } Repetition::OPTIONAL => { new_context.def_level += 1; @@ -143,19 +144,12 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext }; let reader = self.build_for_primitive_type_inner( - cur_type.clone(), + cur_type, &new_context, null_mask_only, )?; - if cur_type.get_basic_info().repetition() == Repetition::REPEATED { - Err(ArrowError(format!( - "Reading repeated field ({:?}) is not supported yet!", - cur_type.name() - ))) - } else { - Ok(Some(reader)) - } + Ok(Some(reader)) } else { Ok(None) } @@ -173,30 +167,19 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext if cur_type.get_basic_info().has_repetition() { match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; + return Err(ArrowError(format!( + "Reading repeated struct ({:?}) is not supported yet!", + cur_type.name(), + ))) } Repetition::OPTIONAL => { new_context.def_level += 1; } - _ => (), + Repetition::REQUIRED => {} } } - if let Some(reader) = self.build_for_struct_type_inner(&cur_type, &new_context)? { - if cur_type.get_basic_info().has_repetition() - && cur_type.get_basic_info().repetition() == Repetition::REPEATED - { - Err(ArrowError(format!( - "Reading repeated field ({:?}) is not supported yet!", - cur_type.name(), - ))) - } else { - Ok(Some(reader)) - } - } else { - Ok(None) - } + self.build_for_struct_type_inner(&cur_type, &new_context) } /// Build array reader for map type. @@ -208,42 +191,61 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext // Add map type to context let mut new_context = context.clone(); new_context.path.append(vec![map_type.name().to_string()]); - if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() { - new_context.def_level += 1; + + match map_type.get_basic_info().repetition() { + Repetition::REQUIRED => {} + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + Repetition::REPEATED => { + return Err(ArrowError("Map cannot be repeated".to_string())) + } + } + + if map_type.get_fields().len() != 1 { + return Err(ArrowError(format!( + "Map field must have exactly one key_value child, found {}", + map_type.get_fields().len() + ))); } // Add map entry (key_value) to context - let map_key_value = map_type.get_fields().first().ok_or_else(|| { - ArrowError("Map field must have a key_value entry".to_string()) - })?; + let map_key_value = &map_type.get_fields()[0]; + if map_key_value.get_basic_info().repetition() != Repetition::REPEATED { + return Err(ArrowError( + "Child of map field must be repeated".to_string(), + )); + } + new_context .path .append(vec![map_key_value.name().to_string()]); + new_context.rep_level += 1; + new_context.def_level += 1; + + if map_key_value.get_fields().len() != 2 { + // According to the specification the values are optional (#1642) + return Err(ArrowError(format!( + "Child of map field must have two children, found {}", + map_key_value.get_fields().len() + ))); + } // Get key and value, and create context for each - let map_key = map_key_value - .get_fields() - .first() - .ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?; - let map_value = map_key_value - .get_fields() - .get(1) - .ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?; - - let key_reader = { - let mut key_context = new_context.clone(); - key_context.def_level += 1; - key_context.path.append(vec![map_key.name().to_string()]); - self.dispatch(map_key.clone(), &key_context)?.unwrap() - }; - let value_reader = { - let mut value_context = new_context.clone(); - if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() { - value_context.def_level += 1; - } - self.dispatch(map_value.clone(), &value_context)?.unwrap() - }; + let map_key = &map_key_value.get_fields()[0]; + let map_value = &map_key_value.get_fields()[1]; + + if map_key.get_basic_info().repetition() != Repetition::REQUIRED { + return Err(ArrowError("Map keys must be required".to_string())); + } + + if map_value.get_basic_info().repetition() == Repetition::REPEATED { + return Err(ArrowError("Map values cannot be repeated".to_string())); + } + + let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap(); + let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap(); let arrow_type = self .arrow_schema @@ -290,101 +292,89 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext item_type: Arc, context: &'a ArrayReaderBuilderContext, ) -> Result>> { - let mut list_child = &list_type - .get_fields() - .first() - .ok_or_else(|| ArrowError("List field must have a child.".to_string()))? - .clone(); let mut new_context = context.clone(); - new_context.path.append(vec![list_type.name().to_string()]); - // We need to know at what definition a list or its child is null - let list_null_def = new_context.def_level; - let mut list_empty_def = new_context.def_level; - - // If the list's root is nullable - if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() { - new_context.def_level += 1; - // current level is nullable, increment to get level for empty list slot - list_empty_def += 1; - } - match list_child.get_basic_info().repetition() { - Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; - } + // If the list is nullable + let nullable = match list_type.get_basic_info().repetition() { + Repetition::REQUIRED => false, Repetition::OPTIONAL => { new_context.def_level += 1; + true + } + Repetition::REPEATED => { + return Err(general_err!("List type cannot be repeated")) } - _ => (), + }; + + if list_type.get_fields().len() != 1 { + return Err(ArrowError(format!( + "List field must have exactly one child, found {}", + list_type.get_fields().len() + ))); } + let mut list_child = &list_type.get_fields()[0]; - let reader = self.dispatch(item_type.clone(), &new_context); - if let Ok(Some(item_reader)) = reader { - let item_reader_type = item_reader.get_data_type().clone(); - - match item_reader_type { - ArrowType::List(_) - | ArrowType::FixedSizeList(_, _) - | ArrowType::Dictionary(_, _) => Err(ArrowError(format!( - "reading List({:?}) into arrow not supported yet", - item_type - ))), - _ => { - // a list is a group type with a single child. The list child's - // name comes from the child's field name. - // if the child's name is "list" and it has a child, then use this child - if list_child.name() == "list" && !list_child.get_fields().is_empty() - { - list_child = list_child.get_fields().first().unwrap(); - } - let arrow_type = self - .arrow_schema - .field_with_name(list_type.name()) - .ok() - .map(|f| f.data_type().to_owned()) - .unwrap_or_else(|| { - ArrowType::List(Box::new(Field::new( - list_child.name(), - item_reader_type.clone(), - list_child.is_optional(), - ))) - }); - - let list_array_reader: Box = match arrow_type { - ArrowType::List(_) => Box::new(ListArrayReader::::new( - item_reader, - arrow_type, - item_reader_type, - new_context.def_level, - new_context.rep_level, - list_null_def, - list_empty_def, - )), - ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( - item_reader, - arrow_type, - item_reader_type, - new_context.def_level, - new_context.rep_level, - list_null_def, - list_empty_def, - )), - - _ => { - return Err(ArrowError(format!( - "creating ListArrayReader with type {:?} should be unreachable", - arrow_type - ))) - } - }; + if list_child.get_basic_info().repetition() != Repetition::REPEATED { + return Err(ArrowError("List child must be repeated".to_string())); + } - Ok(Some(list_array_reader)) + // The repeated field + new_context.rep_level += 1; + new_context.def_level += 1; + + match self.dispatch(item_type, &new_context) { + Ok(Some(item_reader)) => { + let item_type = item_reader.get_data_type().clone(); + + // a list is a group type with a single child. The list child's + // name comes from the child's field name. + // if the child's name is "list" and it has a child, then use this child + if list_child.name() == "list" && !list_child.get_fields().is_empty() { + list_child = list_child.get_fields().first().unwrap(); } + + let arrow_type = self + .arrow_schema + .field_with_name(list_type.name()) + .ok() + .map(|f| f.data_type().to_owned()) + .unwrap_or_else(|| { + ArrowType::List(Box::new(Field::new( + list_child.name(), + item_type.clone(), + list_child.is_optional(), + ))) + }); + + let list_array_reader: Box = match arrow_type { + ArrowType::List(_) => Box::new(ListArrayReader::::new( + item_reader, + arrow_type, + item_type, + new_context.def_level, + new_context.rep_level, + nullable, + )), + ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( + item_reader, + arrow_type, + item_type, + new_context.def_level, + new_context.rep_level, + nullable, + )), + _ => { + return Err(ArrowError(format!( + "creating ListArrayReader with type {:?} should be unreachable", + arrow_type + ))) + } + }; + + Ok(Some(list_array_reader)) } - } else { - reader + result => result, } } } @@ -637,10 +627,10 @@ impl<'a> ArrayReaderBuilder { let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); for child in cur_type.get_fields() { - let mut struct_context = context.clone(); if let Some(child_reader) = self.dispatch(child.clone(), context)? { // TODO: this results in calling get_arrow_field twice, it could be reused // from child_reader above, by making child_reader carry its `Field` + let mut struct_context = context.clone(); struct_context.path.append(vec![child.name().to_string()]); let field = match self.get_arrow_field(child, &struct_context) { Some(f) => f.clone(), diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 31c52c9c5bf9..00b55b1549df 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -16,17 +16,17 @@ // under the License. use crate::arrow::array_reader::ArrayReader; -use crate::errors::ParquetError::ArrowError; +use crate::errors::ParquetError; use crate::errors::Result; use arrow::array::{ - new_empty_array, ArrayData, ArrayRef, GenericListArray, - OffsetSizeTrait, UInt32Array, + new_empty_array, Array, ArrayData, ArrayRef, BooleanBufferBuilder, GenericListArray, + MutableArrayData, OffsetSizeTrait, }; -use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::buffer::Buffer; use arrow::datatypes::DataType as ArrowType; use arrow::datatypes::ToByteSlice; -use arrow::util::bit_util; use std::any::Any; +use std::cmp::Ordering; use std::marker::PhantomData; use std::sync::Arc; @@ -35,12 +35,12 @@ pub struct ListArrayReader { item_reader: Box, data_type: ArrowType, item_type: ArrowType, - list_def_level: i16, - list_rep_level: i16, - list_empty_def_level: i16, - list_null_def_level: i16, - def_level_buffer: Option, - rep_level_buffer: Option, + /// The definition level at which this list is not null + def_level: i16, + /// The repetition level that corresponds to a new value in this array + rep_level: i16, + /// If this list is nullable + nullable: bool, _marker: PhantomData, } @@ -52,19 +52,15 @@ impl ListArrayReader { item_type: ArrowType, def_level: i16, rep_level: i16, - list_null_def_level: i16, - list_empty_def_level: i16, + nullable: bool, ) -> Self { Self { item_reader, data_type, item_type, - list_def_level: def_level, - list_rep_level: rep_level, - list_null_def_level, - list_empty_def_level, - def_level_buffer: None, - rep_level_buffer: None, + def_level, + rep_level, + nullable, _marker: PhantomData, } } @@ -88,97 +84,159 @@ impl ArrayReader for ListArrayReader { if next_batch_array.len() == 0 { return Ok(new_empty_array(&self.data_type)); } + let def_levels = self .item_reader .get_def_levels() - .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?; + .ok_or_else(|| general_err!("item_reader def levels are None."))?; + let rep_levels = self .item_reader .get_rep_levels() - .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?; + .ok_or_else(|| general_err!("item_reader rep levels are None."))?; - if !((def_levels.len() == rep_levels.len()) - && (rep_levels.len() == next_batch_array.len())) - { - return Err(ArrowError( - format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()), + if OffsetSize::from_usize(next_batch_array.len()).is_none() { + return Err(general_err!( + "offset of {} would overflow list array", + next_batch_array.len() )); } - // List definitions can be encoded as 4 values: - // - n + 0: the list slot is null - // - n + 1: the list slot is not null, but is empty (i.e. []) - // - n + 2: the list slot is not null, but its child is empty (i.e. [ null ]) - // - n + 3: the list slot is not null, and its child is not empty - // Where n is the max definition level of the list's parent. - // If a Parquet schema's only leaf is the list, then n = 0. - - // If the list index is at empty definition, the child slot is null - let non_null_list_indices = - def_levels.iter().enumerate().filter_map(|(index, def)| { - (*def > self.list_empty_def_level).then(|| index as u32) - }); - let indices = UInt32Array::from_iter_values(non_null_list_indices); - let batch_values = - arrow::compute::take(&*next_batch_array.clone(), &indices, None)?; - - // first item in each list has rep_level = 0, subsequent items have rep_level = 1 - let mut offsets: Vec = Vec::new(); - let mut cur_offset = OffsetSize::zero(); - def_levels.iter().zip(rep_levels).for_each(|(d, r)| { - if *r == 0 || d == &self.list_empty_def_level { - offsets.push(cur_offset); - } - if d > &self.list_empty_def_level { - cur_offset += OffsetSize::one(); - } - }); - - offsets.push(cur_offset); - - let num_bytes = bit_util::ceil(offsets.len(), 8); - // TODO: A useful optimization is to use the null count to fill with - // 0 or null, to reduce individual bits set in a loop. - // To favour dense data, set every slot to true, then unset - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.as_slice_mut(); - let mut list_index = 0; - for i in 0..rep_levels.len() { - // If the level is lower than empty, then the slot is null. - // When a list is non-nullable, its empty level = null level, - // so this automatically factors that in. - if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level { - bit_util::unset_bit(null_slice, list_index); + if !rep_levels.is_empty() && rep_levels[0] != 0 { + // This implies either the source data was invalid, or the leaf column + // reader did not correctly delimit semantic records + return Err(general_err!("first repetition level of batch must be 0")); + } + + // A non-nullable list has a single definition level indicating if the list is empty + // + // A nullable list has two definition levels associated with it: + // + // The first identifies if the list is null + // The second identifies if the list is empty + // + // The child data returned above is padded with a value for each not-fully defined level. + // Therefore null and empty lists will correspond to a value in the child array. + // + // Whilst nulls may have a non-zero slice in the offsets array, empty lists must + // be of zero length. As a result we MUST filter out values corresponding to empty + // lists, and for consistency we do the same for nulls. + + // The output offsets for the computed ListArray + let mut list_offsets: Vec = + Vec::with_capacity(next_batch_array.len()); + + // The validity mask of the computed ListArray if nullable + let mut validity = self + .nullable + .then(|| BooleanBufferBuilder::new(next_batch_array.len())); + + // The offset into the filtered child data of the current level being considered + let mut cur_offset = 0; + + // Identifies the start of a run of values to copy from the source child data + let mut filter_start = None; + + // The number of child values skipped due to empty lists or nulls + let mut skipped = 0; + + // Builder used to construct the filtered child data, skipping empty lists and nulls + let mut child_data_builder = MutableArrayData::new( + vec![next_batch_array.data()], + false, + next_batch_array.len(), + ); + + def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| { + match r.cmp(&self.rep_level) { + Ordering::Greater => { + // Repetition level greater than current => already handled by inner array + if *d < self.def_level { + return Err(general_err!( + "Encountered repetition level too large for definition level" + )); + } + } + Ordering::Equal => { + // New value in the current list + cur_offset += 1; + } + Ordering::Less => { + // Create new array slice + // Already checked that this cannot overflow + list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap()); + + if *d >= self.def_level { + // Fully defined value + + // Record current offset if it is None + filter_start.get_or_insert(cur_offset + skipped); + + cur_offset += 1; + + if let Some(validity) = validity.as_mut() { + validity.append(true) + } + } else { + // Flush the current slice of child values if any + if let Some(start) = filter_start.take() { + child_data_builder.extend(0, start, cur_offset + skipped); + } + + if let Some(validity) = validity.as_mut() { + // Valid if empty list + validity.append(*d + 1 == self.def_level) + } + + skipped += 1; + } + } } - if rep_levels[i] == 0 { - list_index += 1; + Ok(()) + })?; + + list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap()); + + let child_data = if skipped == 0 { + // No filtered values - can reuse original array + next_batch_array.data().clone() + } else { + // One or more filtered values - must build new array + if let Some(start) = filter_start.take() { + child_data_builder.extend(0, start, cur_offset + skipped) } + + child_data_builder.freeze() + }; + + if cur_offset != child_data.len() { + return Err(general_err!("Failed to reconstruct list from level data")); } - let value_offsets = Buffer::from(&offsets.to_byte_slice()); - let list_data = ArrayData::builder(self.get_data_type().clone()) - .len(offsets.len() - 1) + let value_offsets = Buffer::from(&list_offsets.to_byte_slice()); + + let mut data_builder = ArrayData::builder(self.get_data_type().clone()) + .len(list_offsets.len() - 1) .add_buffer(value_offsets) - .add_child_data(batch_values.data().clone()) - .null_bit_buffer(null_buf.into()) - .offset(next_batch_array.offset()); + .add_child_data(child_data); - let list_data = unsafe { list_data.build_unchecked() }; + if let Some(mut builder) = validity { + assert_eq!(builder.len(), list_offsets.len() - 1); + data_builder = data_builder.null_bit_buffer(builder.finish()) + } + + let list_data = unsafe { data_builder.build_unchecked() }; let result_array = GenericListArray::::from(list_data); Ok(Arc::new(result_array)) } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + self.item_reader.get_def_levels() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + self.item_reader.get_rep_levels() } } @@ -193,70 +251,183 @@ mod tests { use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; - use arrow::array::{Array, LargeListArray, ListArray, PrimitiveArray}; - use arrow::datatypes::{Field, Int32Type as ArrowInt32}; + use arrow::array::{Array, ArrayDataBuilder, PrimitiveArray}; + use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type}; use std::sync::Arc; - #[test] - fn test_list_array_reader() { - // [[1, null, 2], null, [3, 4]] - let array = Arc::new(PrimitiveArray::::from(vec![ + fn list_type( + data_type: ArrowType, + item_nullable: bool, + ) -> ArrowType { + let field = Box::new(Field::new("item", data_type, item_nullable)); + match OffsetSize::is_large() { + true => ArrowType::LargeList(field), + false => ArrowType::List(field), + } + } + + fn downcast( + array: &ArrayRef, + ) -> &'_ GenericListArray { + array + .as_any() + .downcast_ref::>() + .unwrap() + } + + fn to_offsets(values: Vec) -> Buffer { + Buffer::from_iter( + values + .into_iter() + .map(|x| OffsetSize::from_usize(x).unwrap()), + ) + } + + fn test_nested_list() { + // 3 lists, with first and third nullable + // [ + // [ + // [[1, null], null, [4], []], + // [], + // [[7]], + // [[]], + // [[1, 2, 3], [4, null, 6], null] + // ], + // null, + // [], + // [[[11]]] + // ] + + let l3_item_type = ArrowType::Int32; + let l3_type = list_type::(l3_item_type.clone(), true); + + let l2_item_type = l3_type.clone(); + let l2_type = list_type::(l2_item_type.clone(), true); + + let l1_item_type = l2_type.clone(); + let l1_type = list_type::(l1_item_type.clone(), false); + + let leaf = PrimitiveArray::::from_iter(vec![ Some(1), None, + Some(4), + Some(7), + Some(1), Some(2), + Some(3), + Some(4), None, + Some(6), + Some(11), + ]); + + // [[1, null], null, [4], [], [7], [], [1, 2, 3], [4, null, 6], null, [11]] + let offsets = to_offsets::(vec![0, 2, 2, 3, 3, 4, 4, 7, 10, 10, 11]); + let l3 = ArrayDataBuilder::new(l3_type.clone()) + .len(10) + .add_buffer(offsets) + .add_child_data(leaf.data().clone()) + .null_bit_buffer(Buffer::from([0b11111101, 0b00000010])) + .build() + .unwrap(); + + // [[[1, null], null, [4], []], [], [[7]], [[]], [[1, 2, 3], [4, null, 6], null], [[11]]] + let offsets = to_offsets::(vec![0, 4, 4, 5, 6, 9, 10]); + let l2 = ArrayDataBuilder::new(l2_type.clone()) + .len(6) + .add_buffer(offsets) + .add_child_data(l3) + .build() + .unwrap(); + + let offsets = to_offsets::(vec![0, 5, 5, 5, 6]); + let l1 = ArrayDataBuilder::new(l1_type.clone()) + .len(4) + .add_buffer(offsets) + .add_child_data(l2) + .null_bit_buffer(Buffer::from([0b00001101])) + .build() + .unwrap(); + + let expected = GenericListArray::::from(l1); + + let values = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + None, + Some(4), + None, + None, + Some(7), + None, + Some(1), + Some(2), Some(3), Some(4), + None, + Some(6), + None, + None, + None, + Some(11), ])); let item_array_reader = InMemoryArrayReader::new( ArrowType::Int32, - array, - Some(vec![3, 2, 3, 0, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 1]), + values, + Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6]), + Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0]), ); - let mut list_array_reader = ListArrayReader::::new( + let l3 = ListArrayReader::::new( Box::new(item_array_reader), - ArrowType::List(Box::new(Field::new("item", ArrowType::Int32, true))), - ArrowType::Int32, - 1, - 1, - 0, - 1, + l3_type, + l3_item_type, + 5, + 3, + true, ); - let next_batch = list_array_reader.next_batch(1024).unwrap(); - let list_array = next_batch.as_any().downcast_ref::().unwrap(); - - assert_eq!(3, list_array.len()); - // This passes as I expect - assert_eq!(1, list_array.null_count()); + let l2 = ListArrayReader::::new( + Box::new(l3), + l2_type, + l2_item_type, + 3, + 2, + false, + ); - assert_eq!( - list_array - .value(0) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) + let mut l1 = ListArrayReader::::new( + Box::new(l2), + l1_type, + l1_item_type, + 2, + 1, + true, ); - assert!(list_array.is_null(1)); + let expected_1 = expected.slice(0, 2); + let expected_2 = expected.slice(2, 2); - assert_eq!( - list_array - .value(2) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(3), Some(4)]) - ); + let actual = l1.next_batch(2).unwrap(); + assert_eq!(expected_1.as_ref(), actual.as_ref()); + + let actual = l1.next_batch(1024).unwrap(); + assert_eq!(expected_2.as_ref(), actual.as_ref()); } - #[test] - fn test_large_list_array_reader() { - // [[1, null, 2], null, [3, 4]] + fn test_required_list() { + // [[1, null, 2], [], [3, 4], [], [], [null, 1]] + let expected = + GenericListArray::::from_iter_primitive::(vec![ + Some(vec![Some(1), None, Some(2)]), + Some(vec![]), + Some(vec![Some(3), Some(4)]), + Some(vec![]), + Some(vec![]), + Some(vec![None, Some(1)]), + ]); + let array = Arc::new(PrimitiveArray::::from(vec![ Some(1), None, @@ -264,51 +435,101 @@ mod tests { None, Some(3), Some(4), + None, + None, + None, + Some(1), ])); + let item_array_reader = InMemoryArrayReader::new( ArrowType::Int32, array, - Some(vec![3, 2, 3, 0, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 1]), + Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]), + Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]), ); - let mut list_array_reader = ListArrayReader::::new( + let mut list_array_reader = ListArrayReader::::new( Box::new(item_array_reader), - ArrowType::LargeList(Box::new(Field::new("item", ArrowType::Int32, true))), + list_type::(ArrowType::Int32, true), ArrowType::Int32, 1, 1, - 0, - 1, + false, ); - let next_batch = list_array_reader.next_batch(1024).unwrap(); - let list_array = next_batch - .as_any() - .downcast_ref::() - .unwrap(); + let actual = list_array_reader.next_batch(1024).unwrap(); + let actual = downcast::(&actual); - assert_eq!(3, list_array.len()); + assert_eq!(&expected, actual) + } - assert_eq!( - list_array - .value(0) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) - ); + fn test_nullable_list() { + // [[1, null, 2], null, [], [3, 4], [], [], null, [], [null, 1]] + let expected = + GenericListArray::::from_iter_primitive::(vec![ + Some(vec![Some(1), None, Some(2)]), + None, + Some(vec![]), + Some(vec![Some(3), Some(4)]), + Some(vec![]), + Some(vec![]), + None, + Some(vec![]), + Some(vec![None, Some(1)]), + ]); - assert!(list_array.is_null(1)); + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(2), + None, + None, + Some(3), + Some(4), + None, + None, + None, + None, + None, + Some(1), + ])); - assert_eq!( - list_array - .value(2) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(3), Some(4)]) + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]), + Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]), + ); + + let mut list_array_reader = ListArrayReader::::new( + Box::new(item_array_reader), + list_type::(ArrowType::Int32, true), + ArrowType::Int32, + 2, + 1, + true, ); + + let actual = list_array_reader.next_batch(1024).unwrap(); + let actual = downcast::(&actual); + + assert_eq!(&expected, actual) + } + + fn test_list_array() { + test_nullable_list::(); + test_required_list::(); + test_nested_list::(); + } + + #[test] + fn test_list_array_reader() { + test_list_array::(); + } + + #[test] + fn test_large_list_array_reader() { + test_list_array::() } #[test] diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index f212d05b0712..afee4659c3dd 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::ArrayRef; +use arrow::array::{Array, ArrayRef}; use arrow::datatypes::DataType as ArrowType; use std::any::Any; use std::sync::Arc; @@ -102,6 +102,8 @@ pub struct InMemoryArrayReader { array: ArrayRef, def_levels: Option>, rep_levels: Option>, + last_idx: usize, + cur_idx: usize, } impl InMemoryArrayReader { @@ -111,11 +113,23 @@ impl InMemoryArrayReader { def_levels: Option>, rep_levels: Option>, ) -> Self { + assert!(def_levels + .as_ref() + .map(|d| d.len() == array.len()) + .unwrap_or(true)); + + assert!(rep_levels + .as_ref() + .map(|r| r.len() == array.len()) + .unwrap_or(true)); + Self { data_type, array, def_levels, rep_levels, + cur_idx: 0, + last_idx: 0, } } } @@ -129,16 +143,46 @@ impl ArrayReader for InMemoryArrayReader { &self.data_type } - fn next_batch(&mut self, _batch_size: usize) -> Result { - Ok(self.array.clone()) + fn next_batch(&mut self, batch_size: usize) -> Result { + assert_ne!(batch_size, 0); + // This replicates the logical normally performed by + // RecordReader to delimit semantic records + let read = match &self.rep_levels { + Some(rep_levels) => { + let rep_levels = &rep_levels[self.cur_idx..]; + let mut levels_read = 0; + let mut records_read = 0; + while levels_read < rep_levels.len() && records_read < batch_size { + if rep_levels[levels_read] == 0 { + records_read += 1; // Start of new record + } + levels_read += 1; + } + + // Find end of current record + while levels_read < rep_levels.len() && rep_levels[levels_read] != 0 { + levels_read += 1 + } + levels_read + } + None => batch_size.min(self.array.len() - self.cur_idx), + }; + + self.last_idx = self.cur_idx; + self.cur_idx += read; + Ok(self.array.slice(self.last_idx, read)) } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels.as_deref() + self.def_levels + .as_ref() + .map(|l| &l[self.last_idx..self.cur_idx]) } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels.as_deref() + self.rep_levels + .as_ref() + .map(|l| &l[self.last_idx..self.cur_idx]) } } diff --git a/parquet/src/schema/visitor.rs b/parquet/src/schema/visitor.rs index 8ed079fb4237..9d28fa5e8dcd 100644 --- a/parquet/src/schema/visitor.rs +++ b/parquet/src/schema/visitor.rs @@ -27,17 +27,30 @@ pub trait TypeVisitor { /// Default implementation when visiting a list. /// - /// It checks list type definition and calls `visit_list_with_item` with extracted + /// It checks list type definition and calls [`Self::visit_list_with_item`] with extracted /// item type. /// /// To fully understand this algorithm, please refer to /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). + /// + /// For example, a standard list type looks like: + /// + /// ```text + /// required/optional group my_list (LIST) { + // repeated group list { + // required/optional binary element (UTF8); + // } + // } + /// ``` + /// + /// In such a case, [`Self::visit_list_with_item`] will be called with `my_list` as the list + /// type, and `element` as the `item_type` + /// fn visit_list(&mut self, list_type: TypePtr, context: C) -> Result { match list_type.as_ref() { - Type::PrimitiveType { .. } => panic!( - "{:?} is a list type and can't be processed as primitive.", - list_type - ), + Type::PrimitiveType { .. } => { + panic!("{:?} is a list type and must be a group type", list_type) + } Type::GroupType { basic_info: _, fields,