From d3537c87ea1df39052990c9a257093d4f059167f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Apr 2022 23:57:10 +0100 Subject: [PATCH 01/11] Add support for nested list arrays (#993) --- parquet/src/arrow/array_reader.rs | 8 +- parquet/src/arrow/array_reader/builder.rs | 260 +++++++------- parquet/src/arrow/array_reader/list_array.rs | 336 +++++++++---------- parquet/src/arrow/array_reader/test_util.rs | 12 +- parquet/src/schema/visitor.rs | 23 +- 5 files changed, 317 insertions(+), 322 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index f4139b2f654d..ba2e1f7f41d9 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -717,13 +717,7 @@ impl ArrayReader for StructArrayReader { .children .iter_mut() .map(|reader| reader.next_batch(batch_size)) - .try_fold( - Vec::new(), - |mut result, child_array| -> Result> { - result.push(child_array?); - Ok(result) - }, - )?; + .collect::>>()?; // check that array child data has same size let children_array_len = diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 2836c699c39e..6026492590b2 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -38,7 +38,7 @@ use crate::data_type::{ Int96Type, }; use crate::errors::ParquetError::ArrowError; -use crate::errors::{Result}; +use crate::errors::{ParquetError, Result}; use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr}; use crate::schema::visitor::TypeVisitor; @@ -129,9 +129,10 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext let null_mask_only = match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; - false + return Err(ArrowError(format!( + "Reading repeated primitive ({:?}) is not supported yet!", + cur_type.name() + ))); } Repetition::OPTIONAL => { new_context.def_level += 1; @@ -143,19 +144,12 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext }; let reader = self.build_for_primitive_type_inner( - cur_type.clone(), + cur_type, &new_context, null_mask_only, )?; - if cur_type.get_basic_info().repetition() == Repetition::REPEATED { - Err(ArrowError(format!( - "Reading repeated field ({:?}) is not supported yet!", - cur_type.name() - ))) - } else { - Ok(Some(reader)) - } + Ok(Some(reader)) } else { Ok(None) } @@ -173,30 +167,19 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext if cur_type.get_basic_info().has_repetition() { match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; + return Err(ArrowError(format!( + "Reading repeated struct ({:?}) is not supported yet!", + cur_type.name(), + ))) } Repetition::OPTIONAL => { new_context.def_level += 1; } - _ => (), + Repetition::REQUIRED => {} } } - if let Some(reader) = self.build_for_struct_type_inner(&cur_type, &new_context)? { - if cur_type.get_basic_info().has_repetition() - && cur_type.get_basic_info().repetition() == Repetition::REPEATED - { - Err(ArrowError(format!( - "Reading repeated field ({:?}) is not supported yet!", - cur_type.name(), - ))) - } else { - Ok(Some(reader)) - } - } else { - Ok(None) - } + self.build_for_struct_type_inner(&cur_type, &new_context) } /// Build array reader for map type. @@ -208,42 +191,61 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext // Add map type to context let mut new_context = context.clone(); new_context.path.append(vec![map_type.name().to_string()]); - if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() { - new_context.def_level += 1; + + match map_type.get_basic_info().repetition() { + Repetition::REQUIRED => {} + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + Repetition::REPEATED => { + return Err(ArrowError("Map cannot be repeated".to_string())) + } + } + + if map_type.get_fields().len() != 1 { + return Err(ArrowError(format!( + "Map field must have exactly one key_value child, found {}", + map_type.get_fields().len() + ))); } // Add map entry (key_value) to context - let map_key_value = map_type.get_fields().first().ok_or_else(|| { - ArrowError("Map field must have a key_value entry".to_string()) - })?; + let map_key_value = &map_type.get_fields()[0]; + if map_key_value.get_basic_info().repetition() != Repetition::REPEATED { + return Err(ArrowError( + "Child of map field must be repeated".to_string(), + )); + } + new_context .path .append(vec![map_key_value.name().to_string()]); + new_context.rep_level += 1; + new_context.def_level += 1; + + if map_key_value.get_fields().len() != 2 { + // According to the specification the values are optional (#1642) + return Err(ArrowError(format!( + "Child of map field must have two children, found {}", + map_key_value.get_fields().len() + ))); + } // Get key and value, and create context for each - let map_key = map_key_value - .get_fields() - .first() - .ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?; - let map_value = map_key_value - .get_fields() - .get(1) - .ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?; - - let key_reader = { - let mut key_context = new_context.clone(); - key_context.def_level += 1; - key_context.path.append(vec![map_key.name().to_string()]); - self.dispatch(map_key.clone(), &key_context)?.unwrap() - }; - let value_reader = { - let mut value_context = new_context.clone(); - if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() { - value_context.def_level += 1; - } - self.dispatch(map_value.clone(), &value_context)?.unwrap() - }; + let map_key = &map_key_value.get_fields()[0]; + let map_value = &map_key_value.get_fields()[1]; + + if map_key.get_basic_info().repetition() != Repetition::REQUIRED { + return Err(ArrowError("Map keys must be required".to_string())); + } + + if map_value.get_basic_info().repetition() == Repetition::REPEATED { + return Err(ArrowError("Map values cannot be repeated".to_string())); + } + + let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap(); + let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap(); let arrow_type = self .arrow_schema @@ -295,96 +297,80 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext .first() .ok_or_else(|| ArrowError("List field must have a child.".to_string()))? .clone(); - let mut new_context = context.clone(); + // If the list can contain nulls + let nullable = match list_type.get_basic_info().repetition() { + Repetition::REQUIRED => false, + Repetition::OPTIONAL => true, + Repetition::REPEATED => { + return Err(general_err!("List type cannot be repeated")) + } + }; + + let mut new_context = context.clone(); new_context.path.append(vec![list_type.name().to_string()]); - // We need to know at what definition a list or its child is null - let list_null_def = new_context.def_level; - let mut list_empty_def = new_context.def_level; + + // The repeated field + new_context.rep_level += 1; + new_context.def_level += 1; // If the list's root is nullable - if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() { + if nullable { new_context.def_level += 1; - // current level is nullable, increment to get level for empty list slot - list_empty_def += 1; } - match list_child.get_basic_info().repetition() { - Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; - } - Repetition::OPTIONAL => { - new_context.def_level += 1; - } - _ => (), - } + match self.dispatch(item_type, &new_context) { + Ok(Some(item_reader)) => { + let item_type = item_reader.get_data_type().clone(); + + // a list is a group type with a single child. The list child's + // name comes from the child's field name. + // if the child's name is "list" and it has a child, then use this child + if list_child.name() == "list" && !list_child.get_fields().is_empty() { + list_child = list_child.get_fields().first().unwrap(); + } - let reader = self.dispatch(item_type.clone(), &new_context); - if let Ok(Some(item_reader)) = reader { - let item_reader_type = item_reader.get_data_type().clone(); - - match item_reader_type { - ArrowType::List(_) - | ArrowType::FixedSizeList(_, _) - | ArrowType::Dictionary(_, _) => Err(ArrowError(format!( - "reading List({:?}) into arrow not supported yet", - item_type - ))), - _ => { - // a list is a group type with a single child. The list child's - // name comes from the child's field name. - // if the child's name is "list" and it has a child, then use this child - if list_child.name() == "list" && !list_child.get_fields().is_empty() - { - list_child = list_child.get_fields().first().unwrap(); + let arrow_type = self + .arrow_schema + .field_with_name(list_type.name()) + .ok() + .map(|f| f.data_type().to_owned()) + .unwrap_or_else(|| { + ArrowType::List(Box::new(Field::new( + list_child.name(), + item_type.clone(), + list_child.is_optional(), + ))) + }); + + let list_array_reader: Box = match arrow_type { + ArrowType::List(_) => Box::new(ListArrayReader::::new( + item_reader, + arrow_type, + item_type, + new_context.def_level, + new_context.rep_level, + nullable, + )), + ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( + item_reader, + arrow_type, + item_type, + new_context.def_level, + new_context.rep_level, + nullable, + )), + _ => { + return Err(ArrowError(format!( + "creating ListArrayReader with type {:?} should be unreachable", + arrow_type + ))) } - let arrow_type = self - .arrow_schema - .field_with_name(list_type.name()) - .ok() - .map(|f| f.data_type().to_owned()) - .unwrap_or_else(|| { - ArrowType::List(Box::new(Field::new( - list_child.name(), - item_reader_type.clone(), - list_child.is_optional(), - ))) - }); - - let list_array_reader: Box = match arrow_type { - ArrowType::List(_) => Box::new(ListArrayReader::::new( - item_reader, - arrow_type, - item_reader_type, - new_context.def_level, - new_context.rep_level, - list_null_def, - list_empty_def, - )), - ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( - item_reader, - arrow_type, - item_reader_type, - new_context.def_level, - new_context.rep_level, - list_null_def, - list_empty_def, - )), - - _ => { - return Err(ArrowError(format!( - "creating ListArrayReader with type {:?} should be unreachable", - arrow_type - ))) - } - }; + }; - Ok(Some(list_array_reader)) - } + Ok(Some(list_array_reader)) } - } else { - reader + result => result, } } } @@ -637,10 +623,10 @@ impl<'a> ArrayReaderBuilder { let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); for child in cur_type.get_fields() { - let mut struct_context = context.clone(); if let Some(child_reader) = self.dispatch(child.clone(), context)? { // TODO: this results in calling get_arrow_field twice, it could be reused // from child_reader above, by making child_reader carry its `Field` + let mut struct_context = context.clone(); struct_context.path.append(vec![child.name().to_string()]); let field = match self.get_arrow_field(child, &struct_context) { Some(f) => f.clone(), diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 31c52c9c5bf9..611c2cbea8e8 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -16,17 +16,17 @@ // under the License. use crate::arrow::array_reader::ArrayReader; -use crate::errors::ParquetError::ArrowError; +use crate::errors::ParquetError; use crate::errors::Result; use arrow::array::{ - new_empty_array, ArrayData, ArrayRef, GenericListArray, - OffsetSizeTrait, UInt32Array, + new_empty_array, Array, ArrayData, ArrayRef, BooleanBufferBuilder, GenericListArray, + MutableArrayData, OffsetSizeTrait, }; -use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::buffer::Buffer; use arrow::datatypes::DataType as ArrowType; use arrow::datatypes::ToByteSlice; -use arrow::util::bit_util; use std::any::Any; +use std::cmp::Ordering; use std::marker::PhantomData; use std::sync::Arc; @@ -35,12 +35,12 @@ pub struct ListArrayReader { item_reader: Box, data_type: ArrowType, item_type: ArrowType, - list_def_level: i16, - list_rep_level: i16, - list_empty_def_level: i16, - list_null_def_level: i16, - def_level_buffer: Option, - rep_level_buffer: Option, + // The definition level at which this list is not null + def_level: i16, + // The repetition level that corresponds to a new value in this array + rep_level: i16, + // If this list is nullable + nullable: bool, _marker: PhantomData, } @@ -52,19 +52,15 @@ impl ListArrayReader { item_type: ArrowType, def_level: i16, rep_level: i16, - list_null_def_level: i16, - list_empty_def_level: i16, + nullable: bool, ) -> Self { Self { item_reader, data_type, item_type, - list_def_level: def_level, - list_rep_level: rep_level, - list_null_def_level, - list_empty_def_level, - def_level_buffer: None, - rep_level_buffer: None, + def_level, + rep_level, + nullable, _marker: PhantomData, } } @@ -88,97 +84,147 @@ impl ArrayReader for ListArrayReader { if next_batch_array.len() == 0 { return Ok(new_empty_array(&self.data_type)); } + let def_levels = self .item_reader .get_def_levels() - .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?; + .ok_or_else(|| general_err!("item_reader def levels are None."))?; + let rep_levels = self .item_reader .get_rep_levels() - .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?; + .ok_or_else(|| general_err!("item_reader rep levels are None."))?; - if !((def_levels.len() == rep_levels.len()) - && (rep_levels.len() == next_batch_array.len())) - { - return Err(ArrowError( - format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()), + if OffsetSize::from_usize(next_batch_array.len()).is_none() { + return Err(general_err!( + "offset of {} would overflow list array", + next_batch_array.len() )); } - // List definitions can be encoded as 4 values: - // - n + 0: the list slot is null - // - n + 1: the list slot is not null, but is empty (i.e. []) - // - n + 2: the list slot is not null, but its child is empty (i.e. [ null ]) - // - n + 3: the list slot is not null, and its child is not empty - // Where n is the max definition level of the list's parent. - // If a Parquet schema's only leaf is the list, then n = 0. - - // If the list index is at empty definition, the child slot is null - let non_null_list_indices = - def_levels.iter().enumerate().filter_map(|(index, def)| { - (*def > self.list_empty_def_level).then(|| index as u32) - }); - let indices = UInt32Array::from_iter_values(non_null_list_indices); - let batch_values = - arrow::compute::take(&*next_batch_array.clone(), &indices, None)?; - - // first item in each list has rep_level = 0, subsequent items have rep_level = 1 - let mut offsets: Vec = Vec::new(); - let mut cur_offset = OffsetSize::zero(); - def_levels.iter().zip(rep_levels).for_each(|(d, r)| { - if *r == 0 || d == &self.list_empty_def_level { - offsets.push(cur_offset); - } - if d > &self.list_empty_def_level { - cur_offset += OffsetSize::one(); - } - }); - - offsets.push(cur_offset); - - let num_bytes = bit_util::ceil(offsets.len(), 8); - // TODO: A useful optimization is to use the null count to fill with - // 0 or null, to reduce individual bits set in a loop. - // To favour dense data, set every slot to true, then unset - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.as_slice_mut(); - let mut list_index = 0; - for i in 0..rep_levels.len() { - // If the level is lower than empty, then the slot is null. - // When a list is non-nullable, its empty level = null level, - // so this automatically factors that in. - if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level { - bit_util::unset_bit(null_slice, list_index); + // A non-nullable list has a single definition level indicating if the list is empty + // + // A nullable list has two definition levels associated with it: + // + // The first identifies if the list is null + // The second identifies if the list is empty + + // Whilst nulls may have a non-zero slice in the offsets array, an empty slice + // must not, we must therefore filter out these empty slices + + // The offset of the current element being considered + let mut cur_offset = 0; + + // The offsets identifying the list start and end offsets + let mut list_offsets: Vec = + Vec::with_capacity(next_batch_array.len()); + + // The validity mask of the final list + let mut validity = self + .nullable + .then(|| BooleanBufferBuilder::new(next_batch_array.len())); + + // The position of the current slice of child data not corresponding to empty lists + let mut cur_start_offset = None; + + // The number of child values skipped due to empty lists + let mut skipped = 0; + + // Builder used to construct child data, skipping empty lists + let mut child_data_builder = MutableArrayData::new( + vec![next_batch_array.data()], + false, + next_batch_array.len(), + ); + + def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| { + match r.cmp(&self.rep_level) { + Ordering::Greater => { + // Repetition level greater than current => already handled by inner array + if *d < self.def_level { + return Err(general_err!( + "Encountered repetition level too large for definition level" + )); + } + } + Ordering::Equal => { + // New value in the current list + cur_offset += 1; + } + Ordering::Less => { + // Create new array slice + list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap()); + + if *d + 1 == self.def_level { + // Empty list + if let Some(start) = cur_start_offset.take() { + child_data_builder.extend( + 0, + start + skipped, + cur_offset + skipped, + ); + } + + if let Some(validity) = validity.as_mut() { + validity.append(true) + } + + skipped += 1; + } else { + cur_start_offset.get_or_insert(cur_offset); + cur_offset += 1; + + if let Some(validity) = validity.as_mut() { + validity.append(*d >= self.def_level) + } + } + } } - if rep_levels[i] == 0 { - list_index += 1; + Ok(()) + })?; + + list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap()); + + let child_data = if skipped == 0 { + // No empty lists - can reuse original array + next_batch_array.data().clone() + } else { + // One or more empty lists - must build new array + if let Some(start) = cur_start_offset.take() { + child_data_builder.extend(0, start + skipped, cur_offset + skipped) } + + child_data_builder.freeze() + }; + + if cur_offset != child_data.len() { + return Err(general_err!("Failed to reconstruct list from level data")); } - let value_offsets = Buffer::from(&offsets.to_byte_slice()); - let list_data = ArrayData::builder(self.get_data_type().clone()) - .len(offsets.len() - 1) + let value_offsets = Buffer::from(&list_offsets.to_byte_slice()); + + let mut data_builder = ArrayData::builder(self.get_data_type().clone()) + .len(list_offsets.len() - 1) .add_buffer(value_offsets) - .add_child_data(batch_values.data().clone()) - .null_bit_buffer(null_buf.into()) - .offset(next_batch_array.offset()); + .add_child_data(child_data); + + if let Some(mut builder) = validity { + assert_eq!(builder.len(), list_offsets.len() - 1); + data_builder = data_builder.null_bit_buffer(builder.finish()) + } - let list_data = unsafe { list_data.build_unchecked() }; + let list_data = unsafe { data_builder.build_unchecked() }; let result_array = GenericListArray::::from(list_data); Ok(Arc::new(result_array)) } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + self.item_reader.get_def_levels() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + self.item_reader.get_rep_levels() } } @@ -193,18 +239,18 @@ mod tests { use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; - use arrow::array::{Array, LargeListArray, ListArray, PrimitiveArray}; - use arrow::datatypes::{Field, Int32Type as ArrowInt32}; + use arrow::array::{Array, PrimitiveArray}; + use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type}; use std::sync::Arc; - #[test] - fn test_list_array_reader() { - // [[1, null, 2], null, [3, 4]] + fn test_list_array() { + // [[1, null, 2], null, [], [3, 4]] let array = Arc::new(PrimitiveArray::::from(vec![ Some(1), None, Some(2), None, + None, Some(3), Some(4), ])); @@ -212,103 +258,49 @@ mod tests { let item_array_reader = InMemoryArrayReader::new( ArrowType::Int32, array, - Some(vec![3, 2, 3, 0, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 1]), - ); - - let mut list_array_reader = ListArrayReader::::new( - Box::new(item_array_reader), - ArrowType::List(Box::new(Field::new("item", ArrowType::Int32, true))), - ArrowType::Int32, - 1, - 1, - 0, - 1, - ); - - let next_batch = list_array_reader.next_batch(1024).unwrap(); - let list_array = next_batch.as_any().downcast_ref::().unwrap(); - - assert_eq!(3, list_array.len()); - // This passes as I expect - assert_eq!(1, list_array.null_count()); - - assert_eq!( - list_array - .value(0) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) + Some(vec![3, 2, 3, 0, 1, 3, 3]), + Some(vec![0, 1, 1, 0, 0, 0, 1]), ); - assert!(list_array.is_null(1)); + let field = Box::new(Field::new("item", ArrowType::Int32, true)); + let data_type = match OffsetSize::is_large() { + true => ArrowType::LargeList(field), + false => ArrowType::List(field), + }; - assert_eq!( - list_array - .value(2) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(3), Some(4)]) - ); - } - - #[test] - fn test_large_list_array_reader() { - // [[1, null, 2], null, [3, 4]] - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - None, - Some(3), - Some(4), - ])); - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![3, 2, 3, 0, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 1]), - ); - - let mut list_array_reader = ListArrayReader::::new( + let mut list_array_reader = ListArrayReader::::new( Box::new(item_array_reader), - ArrowType::LargeList(Box::new(Field::new("item", ArrowType::Int32, true))), + data_type, ArrowType::Int32, + 2, 1, - 1, - 0, - 1, + true, ); let next_batch = list_array_reader.next_batch(1024).unwrap(); let list_array = next_batch .as_any() - .downcast_ref::() + .downcast_ref::>() .unwrap(); - assert_eq!(3, list_array.len()); - - assert_eq!( - list_array - .value(0) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) - ); + let expected = + GenericListArray::::from_iter_primitive::(vec![ + Some(vec![Some(1), None, Some(2)]), + None, + Some(vec![]), + Some(vec![Some(3), Some(4)]), + ]); + assert_eq!(&expected, list_array) + } - assert!(list_array.is_null(1)); + #[test] + fn test_list_array_reader() { + test_list_array::(); + } - assert_eq!( - list_array - .value(2) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(3), Some(4)]) - ); + #[test] + fn test_large_list_array_reader() { + test_list_array::() } #[test] diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index f212d05b0712..94fee8eeee8f 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::ArrayRef; +use arrow::array::{Array, ArrayRef}; use arrow::datatypes::DataType as ArrowType; use std::any::Any; use std::sync::Arc; @@ -111,6 +111,16 @@ impl InMemoryArrayReader { def_levels: Option>, rep_levels: Option>, ) -> Self { + assert!(def_levels + .as_ref() + .map(|d| d.len() == array.len()) + .unwrap_or(true)); + + assert!(rep_levels + .as_ref() + .map(|r| r.len() == array.len()) + .unwrap_or(true)); + Self { data_type, array, diff --git a/parquet/src/schema/visitor.rs b/parquet/src/schema/visitor.rs index 8ed079fb4237..d1cd96fbb0a9 100644 --- a/parquet/src/schema/visitor.rs +++ b/parquet/src/schema/visitor.rs @@ -27,17 +27,30 @@ pub trait TypeVisitor { /// Default implementation when visiting a list. /// - /// It checks list type definition and calls `visit_list_with_item` with extracted + /// It checks list type definition and calls [`visit_list_with_item`] with extracted /// item type. /// /// To fully understand this algorithm, please refer to /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). + /// + /// For example, a standard list type looks like: + /// + /// ```ignore + /// required/optional group my_list (LIST) { + // repeated group list { + // required/optional binary element (UTF8); + // } + // } + /// ``` + /// + /// In such a case, [`visit_list_with_item`] will be called with `my_list` as the list + /// type, and `element` as the `item_type` + /// fn visit_list(&mut self, list_type: TypePtr, context: C) -> Result { match list_type.as_ref() { - Type::PrimitiveType { .. } => panic!( - "{:?} is a list type and can't be processed as primitive.", - list_type - ), + Type::PrimitiveType { .. } => { + panic!("{:?} is a list type and must be a group type", list_type) + } Type::GroupType { basic_info: _, fields, From bf32cb65543eb345ddb9784daf1b9dc494ff2d3a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 May 2022 13:35:54 +0100 Subject: [PATCH 02/11] More tests --- parquet/src/arrow/array_reader/list_array.rs | 241 +++++++++++++++++-- 1 file changed, 220 insertions(+), 21 deletions(-) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 611c2cbea8e8..5f3fed2b6f86 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -239,58 +239,257 @@ mod tests { use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; - use arrow::array::{Array, PrimitiveArray}; + use arrow::array::{Array, ArrayDataBuilder, PrimitiveArray}; use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type}; use std::sync::Arc; - fn test_list_array() { - // [[1, null, 2], null, [], [3, 4]] + fn list_type( + data_type: ArrowType, + item_nullable: bool, + ) -> ArrowType { + let field = Box::new(Field::new("item", data_type, item_nullable)); + match OffsetSize::is_large() { + true => ArrowType::LargeList(field), + false => ArrowType::List(field), + } + } + + fn downcast( + array: &ArrayRef, + ) -> &'_ GenericListArray { + array + .as_any() + .downcast_ref::>() + .unwrap() + } + + fn to_offsets(values: Vec) -> Buffer { + Buffer::from_iter( + values + .into_iter() + .map(|x| OffsetSize::from_usize(x).unwrap()), + ) + } + + fn test_nested_list() { + // 3 lists, with first and third nullable + // [ + // [ + // [[1, null], null, [4], []], + // [], + // [[7]], + // [[]] + // ], + // null, + // [], + // [[[11]]] + // ] + + let l3_item_type = ArrowType::Int32; + let l3_type = list_type::(l3_item_type.clone(), true); + + let l2_item_type = l3_type.clone(); + let l2_type = list_type::(l2_item_type.clone(), true); + + let l1_item_type = l2_type.clone(); + let l1_type = list_type::(l1_item_type.clone(), false); + + let leaf = PrimitiveArray::::from_iter(vec![ + Some(1), + None, + Some(4), + Some(7), + Some(11), + ]); + + // [[1, null], null, [4], [], [7], [], [11]] + let offsets = to_offsets::(vec![0, 2, 2, 3, 3, 4, 4, 5]); + let l3 = ArrayDataBuilder::new(l3_type.clone()) + .len(7) + .add_buffer(offsets) + .add_child_data(leaf.data().clone()) + .null_bit_buffer(Buffer::from([0b01111101])) + .build() + .unwrap(); + + // [[[1, null], null, [4], []],[], [[7]], [[]], [[11]]] + let offsets = to_offsets::(vec![0, 4, 4, 5, 6, 7]); + let l2 = ArrayDataBuilder::new(l2_type.clone()) + .len(5) + .add_buffer(offsets) + .add_child_data(l3) + .build() + .unwrap(); + + let offsets = to_offsets::(vec![0, 4, 4, 4, 5]); + let l1 = ArrayDataBuilder::new(l1_type.clone()) + .len(4) + .add_buffer(offsets) + .add_child_data(l2) + .null_bit_buffer(Buffer::from([0b00001101])) + .build() + .unwrap(); + + let expected = GenericListArray::::from(l1); + + let values = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + None, + Some(4), + None, + None, + Some(7), + None, + None, + None, + Some(11), + ])); + + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + values, + Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 0, 1, 6]), + Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 0, 0, 0]), + ); + + let l3 = ListArrayReader::::new( + Box::new(item_array_reader), + l3_type, + l3_item_type, + 5, + 3, + true, + ); + + let l2 = ListArrayReader::::new( + Box::new(l3), + l2_type, + l2_item_type, + 3, + 2, + false, + ); + + let mut l1 = ListArrayReader::::new( + Box::new(l2), + l1_type, + l1_item_type, + 2, + 1, + true, + ); + + let actual = l1.next_batch(1024).unwrap(); + let actual = downcast::(&actual); + + assert_eq!(&expected, actual) + } + + fn test_required_list() { + // [[1, null, 2], [], [3, 4], [], [], [null, 1]] + let expected = + GenericListArray::::from_iter_primitive::(vec![ + Some(vec![Some(1), None, Some(2)]), + Some(vec![]), + Some(vec![Some(3), Some(4)]), + Some(vec![]), + Some(vec![]), + Some(vec![None, Some(1)]), + ]); + let array = Arc::new(PrimitiveArray::::from(vec![ Some(1), None, Some(2), None, - None, Some(3), Some(4), + None, + None, + None, + Some(1), ])); let item_array_reader = InMemoryArrayReader::new( ArrowType::Int32, array, - Some(vec![3, 2, 3, 0, 1, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 0, 1]), + Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]), + Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]), ); - let field = Box::new(Field::new("item", ArrowType::Int32, true)); - let data_type = match OffsetSize::is_large() { - true => ArrowType::LargeList(field), - false => ArrowType::List(field), - }; - let mut list_array_reader = ListArrayReader::::new( Box::new(item_array_reader), - data_type, + list_type::(ArrowType::Int32, true), ArrowType::Int32, - 2, 1, - true, + 1, + false, ); - let next_batch = list_array_reader.next_batch(1024).unwrap(); - let list_array = next_batch - .as_any() - .downcast_ref::>() - .unwrap(); + let actual = list_array_reader.next_batch(1024).unwrap(); + let actual = downcast::(&actual); + + assert_eq!(&expected, actual) + } + fn test_nullable_list() { + // [[1, null, 2], null, [], [3, 4], [], [], null, [], [null, 1]] let expected = GenericListArray::::from_iter_primitive::(vec![ Some(vec![Some(1), None, Some(2)]), None, Some(vec![]), Some(vec![Some(3), Some(4)]), + Some(vec![]), + Some(vec![]), + None, + Some(vec![]), + Some(vec![None, Some(1)]), ]); - assert_eq!(&expected, list_array) + + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(2), + None, + None, + Some(3), + Some(4), + None, + None, + None, + None, + None, + Some(1), + ])); + + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]), + Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]), + ); + + let mut list_array_reader = ListArrayReader::::new( + Box::new(item_array_reader), + list_type::(ArrowType::Int32, true), + ArrowType::Int32, + 2, + 1, + true, + ); + + let actual = list_array_reader.next_batch(1024).unwrap(); + let actual = downcast::(&actual); + + assert_eq!(&expected, actual) + } + + fn test_list_array() { + test_nullable_list::(); + test_required_list::(); + test_nested_list::(); } #[test] From 70c16c32027400b7f60ac70a27b786cbe4c8efa4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 May 2022 16:50:47 +0100 Subject: [PATCH 03/11] Minor cleanup --- parquet/src/arrow/array_reader/builder.rs | 32 +++++++++++++---------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 6026492590b2..af2896350cad 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -292,33 +292,37 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext item_type: Arc, context: &'a ArrayReaderBuilderContext, ) -> Result>> { - let mut list_child = &list_type - .get_fields() - .first() - .ok_or_else(|| ArrowError("List field must have a child.".to_string()))? - .clone(); + let mut new_context = context.clone(); + new_context.path.append(vec![list_type.name().to_string()]); - // If the list can contain nulls + // If the list is nullable let nullable = match list_type.get_basic_info().repetition() { Repetition::REQUIRED => false, - Repetition::OPTIONAL => true, + Repetition::OPTIONAL => { + new_context.def_level += 1; + true + } Repetition::REPEATED => { return Err(general_err!("List type cannot be repeated")) } }; - let mut new_context = context.clone(); - new_context.path.append(vec![list_type.name().to_string()]); + if list_type.get_fields().len() != 1 { + return Err(ArrowError(format!( + "List field must have exactly one child, found {}", + list_type.get_fields().len() + ))); + } + let mut list_child = &list_type.get_fields()[0]; + + if list_child.get_basic_info().repetition() != Repetition::REPEATED { + return Err(ArrowError("List child must be repeated".to_string())); + } // The repeated field new_context.rep_level += 1; new_context.def_level += 1; - // If the list's root is nullable - if nullable { - new_context.def_level += 1; - } - match self.dispatch(item_type, &new_context) { Ok(Some(item_reader)) => { let item_type = item_reader.get_data_type().clone(); From e9acb226e98124f775a7b8b73c0b68d22550d858 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 May 2022 17:33:16 +0100 Subject: [PATCH 04/11] Filter nulls --- parquet/src/arrow/array_reader/list_array.rs | 62 +++++++++++--------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 5f3fed2b6f86..ce3b9cae6094 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -108,29 +108,33 @@ impl ArrayReader for ListArrayReader { // // The first identifies if the list is null // The second identifies if the list is empty + // + // The child data returned above is padded with a value for each not-fully defined level. + // Therefore null and empty lists will correspond to a value in the child array. + // + // Whilst nulls may have a non-zero slice in the offsets array, empty lists must + // be of zero length. As a result we MUST filter out values corresponding to empty + // lists, and for consistency we do the same for nulls. - // Whilst nulls may have a non-zero slice in the offsets array, an empty slice - // must not, we must therefore filter out these empty slices - - // The offset of the current element being considered - let mut cur_offset = 0; - - // The offsets identifying the list start and end offsets + // The output offsets for the computed ListArray let mut list_offsets: Vec = Vec::with_capacity(next_batch_array.len()); - // The validity mask of the final list + // The validity mask of the computed ListArray if nullable let mut validity = self .nullable .then(|| BooleanBufferBuilder::new(next_batch_array.len())); - // The position of the current slice of child data not corresponding to empty lists - let mut cur_start_offset = None; + // The offset into the filtered child data of the current level being considered + let mut cur_offset = 0; + + // Identifies the start of a run of values to copy from the source child data + let mut filter_start = None; - // The number of child values skipped due to empty lists + // The number of child values skipped due to empty lists or nulls let mut skipped = 0; - // Builder used to construct child data, skipping empty lists + // Builder used to construct the filtered child data, skipping empty lists and nulls let mut child_data_builder = MutableArrayData::new( vec![next_batch_array.data()], false, @@ -153,30 +157,32 @@ impl ArrayReader for ListArrayReader { } Ordering::Less => { // Create new array slice + // Already checked that this cannot overflow list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap()); - if *d + 1 == self.def_level { - // Empty list - if let Some(start) = cur_start_offset.take() { - child_data_builder.extend( - 0, - start + skipped, - cur_offset + skipped, - ); - } + if *d >= self.def_level { + // Fully defined value + + // Record current offset if it is None + filter_start.get_or_insert_with(|| cur_offset + skipped); + + cur_offset += 1; if let Some(validity) = validity.as_mut() { validity.append(true) } - - skipped += 1; } else { - cur_start_offset.get_or_insert(cur_offset); - cur_offset += 1; + // Flush the current slice of child values if any + if let Some(start) = filter_start.take() { + child_data_builder.extend(0, start, cur_offset + skipped); + } if let Some(validity) = validity.as_mut() { - validity.append(*d >= self.def_level) + // Valid if empty list + validity.append(*d + 1 == self.def_level) } + + skipped += 1; } } } @@ -190,8 +196,8 @@ impl ArrayReader for ListArrayReader { next_batch_array.data().clone() } else { // One or more empty lists - must build new array - if let Some(start) = cur_start_offset.take() { - child_data_builder.extend(0, start + skipped, cur_offset + skipped) + if let Some(start) = filter_start.take() { + child_data_builder.extend(0, start, cur_offset + skipped) } child_data_builder.freeze() From b3ca78eb518b6021c98c3727517381659a53ae40 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 May 2022 17:39:27 +0100 Subject: [PATCH 05/11] Update comments --- parquet/src/arrow/array_reader/list_array.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index ce3b9cae6094..4cef21fed3fd 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -192,10 +192,10 @@ impl ArrayReader for ListArrayReader { list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap()); let child_data = if skipped == 0 { - // No empty lists - can reuse original array + // No filtered values - can reuse original array next_batch_array.data().clone() } else { - // One or more empty lists - must build new array + // One or more filtered values - must build new array if let Some(start) = filter_start.take() { child_data_builder.extend(0, start, cur_offset + skipped) } From 1d00e7c00e546e5cdaec11ebce75cf4e01f45141 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 May 2022 17:53:14 +0100 Subject: [PATCH 06/11] Fix doc --- parquet/src/schema/visitor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/schema/visitor.rs b/parquet/src/schema/visitor.rs index d1cd96fbb0a9..9d28fa5e8dcd 100644 --- a/parquet/src/schema/visitor.rs +++ b/parquet/src/schema/visitor.rs @@ -27,7 +27,7 @@ pub trait TypeVisitor { /// Default implementation when visiting a list. /// - /// It checks list type definition and calls [`visit_list_with_item`] with extracted + /// It checks list type definition and calls [`Self::visit_list_with_item`] with extracted /// item type. /// /// To fully understand this algorithm, please refer to @@ -35,7 +35,7 @@ pub trait TypeVisitor { /// /// For example, a standard list type looks like: /// - /// ```ignore + /// ```text /// required/optional group my_list (LIST) { // repeated group list { // required/optional binary element (UTF8); @@ -43,7 +43,7 @@ pub trait TypeVisitor { // } /// ``` /// - /// In such a case, [`visit_list_with_item`] will be called with `my_list` as the list + /// In such a case, [`Self::visit_list_with_item`] will be called with `my_list` as the list /// type, and `element` as the `item_type` /// fn visit_list(&mut self, list_type: TypePtr, context: C) -> Result { From fdae1d5f5a2bd8629a2039c39357974bf5e4d4a2 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 4 May 2022 18:23:21 +0100 Subject: [PATCH 07/11] Fix clippy --- parquet/src/arrow/array_reader/list_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 4cef21fed3fd..bde5d509164d 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -164,7 +164,7 @@ impl ArrayReader for ListArrayReader { // Fully defined value // Record current offset if it is None - filter_start.get_or_insert_with(|| cur_offset + skipped); + filter_start.get_or_insert(cur_offset + skipped); cur_offset += 1; From 1b99c49617d62f375c8a1c1fb23829f17921ff98 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Sun, 8 May 2022 15:39:46 +0100 Subject: [PATCH 08/11] Apply suggestions from code review Co-authored-by: Andrew Lamb --- parquet/src/arrow/array_reader/list_array.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index bde5d509164d..f8003157c11b 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -35,11 +35,11 @@ pub struct ListArrayReader { item_reader: Box, data_type: ArrowType, item_type: ArrowType, - // The definition level at which this list is not null + /// The definition level at which this list is not null def_level: i16, - // The repetition level that corresponds to a new value in this array + /// The repetition level that corresponds to a new value in this array rep_level: i16, - // If this list is nullable + /// If this list is nullable nullable: bool, _marker: PhantomData, } From 05c231190aef2c9054b9ebb7fbb59f4a1daf129f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 8 May 2022 16:57:52 +0100 Subject: [PATCH 09/11] More tests --- parquet/src/arrow/array_reader/list_array.rs | 46 +++++++++++++------ parquet/src/arrow/array_reader/test_util.rs | 47 ++++++++++++++++++-- 2 files changed, 75 insertions(+), 18 deletions(-) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index f8003157c11b..eef21c7f2007 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -284,7 +284,8 @@ mod tests { // [[1, null], null, [4], []], // [], // [[7]], - // [[]] + // [[]], + // [[1, 2, 3], [4, null, 6], null] // ], // null, // [], @@ -305,29 +306,35 @@ mod tests { None, Some(4), Some(7), + Some(1), + Some(2), + Some(3), + Some(4), + None, + Some(6), Some(11), ]); - // [[1, null], null, [4], [], [7], [], [11]] - let offsets = to_offsets::(vec![0, 2, 2, 3, 3, 4, 4, 5]); + // [[1, null], null, [4], [], [7], [], [1, 2, 3], [4, null, 6], null, [11]] + let offsets = to_offsets::(vec![0, 2, 2, 3, 3, 4, 4, 7, 10, 10, 11]); let l3 = ArrayDataBuilder::new(l3_type.clone()) - .len(7) + .len(10) .add_buffer(offsets) .add_child_data(leaf.data().clone()) - .null_bit_buffer(Buffer::from([0b01111101])) + .null_bit_buffer(Buffer::from([0b11111101, 0b00000010])) .build() .unwrap(); - // [[[1, null], null, [4], []],[], [[7]], [[]], [[11]]] - let offsets = to_offsets::(vec![0, 4, 4, 5, 6, 7]); + // [[[1, null], null, [4], []], [], [[7]], [[]], [[1, 2, 3], [4, null, 6], null], [[11]]] + let offsets = to_offsets::(vec![0, 4, 4, 5, 6, 9, 10]); let l2 = ArrayDataBuilder::new(l2_type.clone()) - .len(5) + .len(6) .add_buffer(offsets) .add_child_data(l3) .build() .unwrap(); - let offsets = to_offsets::(vec![0, 4, 4, 4, 5]); + let offsets = to_offsets::(vec![0, 5, 5, 5, 6]); let l1 = ArrayDataBuilder::new(l1_type.clone()) .len(4) .add_buffer(offsets) @@ -347,6 +354,13 @@ mod tests { None, Some(7), None, + Some(1), + Some(2), + Some(3), + Some(4), + None, + Some(6), + None, None, None, Some(11), @@ -355,8 +369,8 @@ mod tests { let item_array_reader = InMemoryArrayReader::new( ArrowType::Int32, values, - Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 0, 1, 6]), - Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 0, 0, 0]), + Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6]), + Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0]), ); let l3 = ListArrayReader::::new( @@ -386,10 +400,14 @@ mod tests { true, ); - let actual = l1.next_batch(1024).unwrap(); - let actual = downcast::(&actual); + let expected_1 = expected.slice(0, 2); + let expected_2 = expected.slice(2, 2); - assert_eq!(&expected, actual) + let actual = l1.next_batch(2).unwrap(); + assert_eq!(expected_1.as_ref(), actual.as_ref()); + + let actual = l1.next_batch(1024).unwrap(); + assert_eq!(expected_2.as_ref(), actual.as_ref()); } fn test_required_list() { diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index 94fee8eeee8f..44c9f7df2374 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -102,6 +102,8 @@ pub struct InMemoryArrayReader { array: ArrayRef, def_levels: Option>, rep_levels: Option>, + last_idx: usize, + cur_idx: usize, } impl InMemoryArrayReader { @@ -126,6 +128,8 @@ impl InMemoryArrayReader { array, def_levels, rep_levels, + cur_idx: 0, + last_idx: 0, } } } @@ -139,16 +143,51 @@ impl ArrayReader for InMemoryArrayReader { &self.data_type } - fn next_batch(&mut self, _batch_size: usize) -> Result { - Ok(self.array.clone()) + fn next_batch(&mut self, batch_size: usize) -> Result { + assert_ne!(batch_size, 0); + + // This replicates the logical normally performed by + // RecordReader to delimit semantic records + let (levels, records) = match &self.rep_levels { + Some(v) => { + let mut records = 0; + let mut levels = 0; + for v in &v[self.cur_idx..] { + if *v == 0 { + // Start of new record + records += 1; + if records == batch_size + 1 { + break; + } + } + levels += 1; + } + (levels, records - 1) + } + None => { + let records = batch_size.min(self.array.len() - self.cur_idx); + (records, records) + } + }; + + let array = self.array.slice(self.cur_idx, records); + + self.last_idx = self.cur_idx; + self.cur_idx += levels; + + Ok(array) } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels.as_deref() + self.def_levels + .as_ref() + .map(|l| &l[self.last_idx..self.cur_idx]) } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels.as_deref() + self.rep_levels + .as_ref() + .map(|l| &l[self.last_idx..self.cur_idx]) } } From f250ba7a96e88d0a99c00eb8a14b041517781dd3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 8 May 2022 17:03:40 +0100 Subject: [PATCH 10/11] Add sanity check to ListArrayReader --- parquet/src/arrow/array_reader/list_array.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index eef21c7f2007..00b55b1549df 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -102,6 +102,12 @@ impl ArrayReader for ListArrayReader { )); } + if !rep_levels.is_empty() && rep_levels[0] != 0 { + // This implies either the source data was invalid, or the leaf column + // reader did not correctly delimit semantic records + return Err(general_err!("first repetition level of batch must be 0")); + } + // A non-nullable list has a single definition level indicating if the list is empty // // A nullable list has two definition levels associated with it: From 15b81441e0ae20ecf16697eaa44c40212e23c951 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 8 May 2022 18:21:28 +0100 Subject: [PATCH 11/11] Fix test_struct_array_reader --- parquet/src/arrow/array_reader.rs | 6 +-- parquet/src/arrow/array_reader/test_util.rs | 41 +++++++++------------ 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index ba2e1f7f41d9..1d8441cbd7f3 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -1532,7 +1532,7 @@ mod tests { ArrowType::Int32, array_1.clone(), Some(vec![0, 1, 2, 3, 1]), - Some(vec![1, 1, 1, 1, 1]), + Some(vec![0, 1, 1, 1, 1]), ); let array_2 = Arc::new(PrimitiveArray::::from(vec![5, 4, 3, 2, 1])); @@ -1540,7 +1540,7 @@ mod tests { ArrowType::Int32, array_2.clone(), Some(vec![0, 1, 3, 1, 2]), - Some(vec![1, 1, 1, 1, 1]), + Some(vec![0, 1, 1, 1, 1]), ); let struct_type = ArrowType::Struct(vec![ @@ -1570,7 +1570,7 @@ mod tests { struct_array_reader.get_def_levels() ); assert_eq!( - Some(vec![1, 1, 1, 1, 1].as_slice()), + Some(vec![0, 1, 1, 1, 1].as_slice()), struct_array_reader.get_rep_levels() ); } diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index 44c9f7df2374..afee4659c3dd 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -145,37 +145,32 @@ impl ArrayReader for InMemoryArrayReader { fn next_batch(&mut self, batch_size: usize) -> Result { assert_ne!(batch_size, 0); - // This replicates the logical normally performed by // RecordReader to delimit semantic records - let (levels, records) = match &self.rep_levels { - Some(v) => { - let mut records = 0; - let mut levels = 0; - for v in &v[self.cur_idx..] { - if *v == 0 { - // Start of new record - records += 1; - if records == batch_size + 1 { - break; - } + let read = match &self.rep_levels { + Some(rep_levels) => { + let rep_levels = &rep_levels[self.cur_idx..]; + let mut levels_read = 0; + let mut records_read = 0; + while levels_read < rep_levels.len() && records_read < batch_size { + if rep_levels[levels_read] == 0 { + records_read += 1; // Start of new record } - levels += 1; + levels_read += 1; } - (levels, records - 1) - } - None => { - let records = batch_size.min(self.array.len() - self.cur_idx); - (records, records) + + // Find end of current record + while levels_read < rep_levels.len() && rep_levels[levels_read] != 0 { + levels_read += 1 + } + levels_read } + None => batch_size.min(self.array.len() - self.cur_idx), }; - let array = self.array.slice(self.cur_idx, records); - self.last_idx = self.cur_idx; - self.cur_idx += levels; - - Ok(array) + self.cur_idx += read; + Ok(self.array.slice(self.last_idx, read)) } fn get_def_levels(&self) -> Option<&[i16]> {