From eaf47b573127ec3b785c6a381d34b63b01e24d4f Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 8 May 2021 16:59:07 +0200 Subject: [PATCH 1/3] fix null struct and list inconsistencies in writer --- parquet/src/arrow/arrow_writer.rs | 52 ++-- parquet/src/arrow/levels.rs | 431 +++++++++++++----------------- 2 files changed, 217 insertions(+), 266 deletions(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index df6ce98c6d04..1e35e4b5f583 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -91,7 +91,7 @@ impl ArrowWriter { let batch_level = LevelInfo::new_from_batch(batch); let mut row_group_writer = self.writer.next_row_group()?; for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { - let mut levels = batch_level.calculate_array_levels(array, field, false); + let mut levels = batch_level.calculate_array_levels(array, field); // Reverse levels as we pop() them when writing arrays levels.reverse(); write_leaves(&mut row_group_writer, array, &mut levels)?; @@ -793,25 +793,29 @@ mod tests { let struct_field_g = Field::new( "g", DataType::List(Box::new(Field::new("item", DataType::Int16, true))), + false, + ); + let struct_field_h = Field::new( + "h", + DataType::List(Box::new(Field::new("item", DataType::Int16, false))), true, ); let struct_field_e = Field::new( "e", - DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]), - true, + DataType::Struct(vec![ + struct_field_f.clone(), + struct_field_g.clone(), + struct_field_h.clone(), + ]), + false, ); let schema = Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, true), - // Note: when the below struct is set to non-nullable, this test fails, - // but the output data written is correct. - // Interestingly, pyarrow will read it correctly, but pyspark fails to. - // This might be a compatibility quirk between arrow and parquet. - // We have opened https://github.com/apache/arrow-rs/issues/245 to investigate Field::new( "c", DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]), - true, + false, ), ]); @@ -831,15 +835,23 @@ mod tests { // Construct a list array from the above two let g_list_data = ArrayData::builder(struct_field_g.data_type().clone()) .len(5) - .add_buffer(g_value_offsets) + .add_buffer(g_value_offsets.clone()) .add_child_data(g_value.data().clone()) - // .null_bit_buffer(Buffer::from(vec![0b00011011])) // TODO: add to test after resolving other issues .build(); let g = ListArray::from(g_list_data); + // The difference between g and h is that h has a null bitmap + let h_list_data = ArrayData::builder(struct_field_h.data_type().clone()) + .len(5) + .add_buffer(g_value_offsets) + .add_child_data(g_value.data().clone()) + .null_bit_buffer(Buffer::from(vec![0b00011011])) + .build(); + let h = ListArray::from(h_list_data); let e = StructArray::from(vec![ (struct_field_f, Arc::new(f) as ArrayRef), (struct_field_g, Arc::new(g) as ArrayRef), + (struct_field_h, Arc::new(h) as ArrayRef), ]); let c = StructArray::from(vec![ @@ -860,14 +872,10 @@ mod tests { #[test] fn arrow_writer_complex_mixed() { // This test was added while investigating https://github.com/apache/arrow-rs/issues/244. - // Only writing the "offest_field" column works when "some_nested_object" is non-null. - // This indicates that a non-null struct should not have a null child (with null values). - // One observation is that spark doesn't consider the parent struct's nullness, - // and so, we should investigate the impact of always treating structs as null. - // See https://github.com/apache/arrow-rs/issues/245. + // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245. // define schema - let offset_field = Field::new("offset", DataType::Int32, true); + let offset_field = Field::new("offset", DataType::Int32, false); let partition_field = Field::new("partition", DataType::Int64, true); let topic_field = Field::new("topic", DataType::Utf8, true); let schema = Schema::new(vec![Field::new( @@ -877,7 +885,7 @@ mod tests { partition_field.clone(), topic_field.clone(), ]), - true, + false, )]); // create some data @@ -970,14 +978,10 @@ mod tests { let schema = Schema::new(vec![field_a.clone()]); // create data - // When the null buffer of the struct is created, this test fails. - // It appears that the nullness of the struct is ignored when the - // struct is read back. - // See https://github.com/apache/arrow-rs/issues/245 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) .len(6) - // .null_bit_buffer(Buffer::from(vec![0b00100111])) + .null_bit_buffer(Buffer::from(vec![0b00100111])) .add_child_data(c.data().clone()) .build(); let b = StructArray::from(b_data); @@ -989,7 +993,7 @@ mod tests { let a = StructArray::from(a_data); assert_eq!(a.null_count(), 0); - assert_eq!(a.column(0).null_count(), 0); + assert_eq!(a.column(0).null_count(), 2); // build a racord batch let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs index 2669581c8712..6d9b2375c3e7 100644 --- a/parquet/src/arrow/levels.rs +++ b/parquet/src/arrow/levels.rs @@ -65,10 +65,29 @@ pub(crate) struct LevelInfo { pub array_mask: Vec, /// The maximum definition at this level, 0 at the record batch pub max_definition: i16, - /// Whether this array or any of its parents is a list - pub is_list: bool, - /// Whether the current array is nullable (affects definition levels) - pub is_nullable: bool, + /// The type of array represented by this level info + pub level_type: LevelType, +} + +/// LevelType defines the type of level, and whether it is nullable or not +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(crate) enum LevelType { + Root, + List(bool), + Struct(bool), + Primitive(bool), +} + +impl LevelType { + #[inline] + const fn level_increment(&self) -> i16 { + match self { + LevelType::Root => 0, + LevelType::List(is_nullable) + | LevelType::Struct(is_nullable) + | LevelType::Primitive(is_nullable) => *is_nullable as i16, + } + } } impl LevelInfo { @@ -87,10 +106,7 @@ impl LevelInfo { // all values at a batch-level are non-null array_mask: vec![true; num_rows], max_definition: 0, - is_list: false, - // a batch is treated as nullable even though it has no nulls, - // this is required to compute nested type levels correctly - is_nullable: false, + level_type: LevelType::Root, } } @@ -110,7 +126,6 @@ impl LevelInfo { &self, array: &ArrayRef, field: &Field, - is_parent_struct: bool, ) -> Vec { let (array_offsets, array_mask) = Self::get_array_offsets_and_masks(array); match array.data_type() { @@ -120,8 +135,8 @@ impl LevelInfo { array_offsets: self.array_offsets.clone(), array_mask, max_definition: self.max_definition.max(1), - is_list: self.is_list, - is_nullable: true, // always nullable as all values are nulls + // Null type is always nullable + level_type: LevelType::Primitive(true), }], DataType::Boolean | DataType::Int8 @@ -152,9 +167,7 @@ impl LevelInfo { vec![self.calculate_child_levels( array_offsets, array_mask, - false, - is_parent_struct, - field.is_nullable(), + LevelType::Primitive(field.is_nullable()), )] } DataType::List(list_field) | DataType::LargeList(list_field) => { @@ -162,10 +175,7 @@ impl LevelInfo { let list_level = self.calculate_child_levels( array_offsets, array_mask, - true, - // the list could come from a struct, but its children will all be false - is_parent_struct, - field.is_nullable(), + LevelType::List(field.is_nullable()), ); // Construct the child array of the list, and get its offset + mask @@ -207,13 +217,11 @@ impl LevelInfo { vec![list_level.calculate_child_levels( child_offsets, child_mask, - false, - false, // always false - list_field.is_nullable(), + LevelType::Primitive(list_field.is_nullable()), )] } DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => { - list_level.calculate_array_levels(&child_array, list_field, false) + list_level.calculate_array_levels(&child_array, list_field) } DataType::FixedSizeList(_, _) => unimplemented!(), DataType::Union(_) => unimplemented!(), @@ -228,10 +236,7 @@ impl LevelInfo { let struct_level = self.calculate_child_levels( array_offsets, array_mask, - false, - // struct's own parent could be a struct - is_parent_struct, - field.is_nullable(), + LevelType::Struct(field.is_nullable()), ); let mut struct_levels = vec![]; struct_array @@ -239,12 +244,8 @@ impl LevelInfo { .into_iter() .zip(struct_fields) .for_each(|(child_array, child_field)| { - let mut levels = struct_level.calculate_array_levels( - child_array, - child_field, - // this is the only place where this is always true - true, - ); + let mut levels = + struct_level.calculate_array_levels(child_array, child_field); struct_levels.append(&mut levels); }); struct_levels @@ -258,9 +259,7 @@ impl LevelInfo { vec![self.calculate_child_levels( array_offsets, array_mask, - false, - is_parent_struct, - field.is_nullable(), + LevelType::Primitive(field.is_nullable()), )] } } @@ -315,80 +314,40 @@ impl LevelInfo { // we use 64-bit offsets to also accommodate large arrays array_offsets: Vec, array_mask: Vec, - is_list: bool, - is_parent_struct: bool, - is_nullable: bool, + level_type: LevelType, ) -> Self { let min_len = *(array_offsets.last().unwrap()) as usize; let mut definition = Vec::with_capacity(min_len); let mut repetition = Vec::with_capacity(min_len); let mut merged_array_mask = Vec::with_capacity(min_len); - // determine the total level increment based on data types - let max_definition = match is_list { - false => { - // first exception, start of a batch, and not list - if self.max_definition == 0 { - 1 - } else if self.is_list { - // second exception, always increment after a list - self.max_definition + 1 - } else if is_parent_struct && !self.is_nullable { - // if the parent is a non-null struct, don't increment - self.max_definition - } else { - self.max_definition + is_nullable as i16 - } + let max_definition = match (self.level_type, level_type) { + (LevelType::Root, LevelType::Struct(is_nullable)) => { + // If the struct is non-nullable, its def level doesn't increment + is_nullable as i16 } - true => { - if is_parent_struct && !self.is_nullable { - self.max_definition + is_nullable as i16 - } else { - self.max_definition + 1 + is_nullable as i16 - } + (LevelType::Root, _) => 1, + (_, LevelType::Root) => { + unreachable!("Cannot have a root as a child") + } + (LevelType::List(_), _) => { + // Always add 1 (TDDO: document why) + self.max_definition + 1 + level_type.level_increment() + } + (LevelType::Struct(_), _) => { + self.max_definition + level_type.level_increment() + } + (_, LevelType::List(is_nullable)) => { + // if the child is a list, even if its parent is a root + self.max_definition + 1 + is_nullable as i16 + } + (LevelType::Primitive(_), _) => { + unreachable!("Cannot have a primitive parent for any type") } }; - match (self.is_list, is_list) { - (false, false) => { - self.definition - .iter() - .zip(array_mask.into_iter().zip(&self.array_mask)) - .for_each(|(def, (child_mask, parent_mask))| { - merged_array_mask.push(*parent_mask && child_mask); - match (parent_mask, child_mask) { - (true, true) => { - definition.push(max_definition); - } - (true, false) => { - // The child is only legally null if its array is nullable. - // Thus parent's max_definition is lower - definition.push(if *def <= self.max_definition { - *def - } else { - self.max_definition - }); - } - // if the parent was false, retain its definitions - (false, _) => { - definition.push(*def); - } - } - }); - - debug_assert_eq!(definition.len(), merged_array_mask.len()); - - Self { - definition, - repetition: self.repetition.clone(), // it's None - array_offsets, - array_mask: merged_array_mask, - max_definition, - is_list: false, - is_nullable, - } - } - (true, true) => { + match (self.level_type, level_type) { + (LevelType::List(_), LevelType::List(is_nullable)) => { // parent is a list or descendant of a list, and child is a list let reps = self.repetition.clone().unwrap(); // Calculate the 2 list hierarchy definitions in advance @@ -466,11 +425,10 @@ impl LevelInfo { array_offsets, array_mask: merged_array_mask, max_definition, - is_list: true, - is_nullable, + level_type, } } - (true, false) => { + (LevelType::List(_), _) => { // List and primitive (or struct). // The list can have more values than the primitive, indicating that there // are slots where the list is empty. We use a counter to track this behaviour. @@ -519,11 +477,10 @@ impl LevelInfo { array_offsets: self.array_offsets.clone(), array_mask: merged_array_mask, max_definition, - is_list: true, - is_nullable, + level_type, } } - (false, true) => { + (_, LevelType::List(is_nullable)) => { // Encountering a list for the first time. // Calculate the 2 list hierarchy definitions in advance @@ -545,11 +502,7 @@ impl LevelInfo { match (parent_mask, child_len) { (true, 0) => { // empty slot that is valid, i.e. {"parent": {"child": [] } } - definition.push(if child_mask { - l2 - } else { - self.max_definition - }); + definition.push(if child_mask { l3 } else { l2 }); repetition.push(0); merged_array_mask.push(child_mask); } @@ -593,8 +546,44 @@ impl LevelInfo { array_offsets, array_mask: merged_array_mask, max_definition, - is_list: true, - is_nullable, + level_type, + } + } + (_, _) => { + self.definition + .iter() + .zip(array_mask.into_iter().zip(&self.array_mask)) + .for_each(|(current_def, (child_mask, parent_mask))| { + merged_array_mask.push(*parent_mask && child_mask); + match (parent_mask, child_mask) { + (true, true) => { + definition.push(max_definition); + } + (true, false) => { + // The child is only legally null if its array is nullable. + // Thus parent's max_definition is lower + definition.push(if *current_def <= self.max_definition { + *current_def + } else { + self.max_definition + }); + } + // if the parent was false, retain its definitions + (false, _) => { + definition.push(*current_def); + } + } + }); + + debug_assert_eq!(definition.len(), merged_array_mask.len()); + + Self { + definition, + repetition: self.repetition.clone(), // it's None + array_offsets, + array_mask: merged_array_mask, + max_definition, + level_type, } } } @@ -647,14 +636,20 @@ impl LevelInfo { .into_iter() .map(|v| v as i64) .collect::>(); - let masks = offsets.windows(2).map(|w| w[1] > w[0]).collect(); - (offsets, masks) + let array_mask = match array.data().null_buffer() { + Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()), + None => vec![true; array.len()], + }; + (offsets, array_mask) } DataType::LargeList(_) => { let offsets = unsafe { array.data().buffers()[0].typed_data::() }.to_vec(); - let masks = offsets.windows(2).map(|w| w[1] > w[0]).collect(); - (offsets, masks) + let array_mask = match array.data().null_buffer() { + Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()), + None => vec![true; array.len()], + }; + (offsets, array_mask) } DataType::FixedSizeBinary(value_len) => { let array_mask = match array.data().null_buffer() { @@ -676,7 +671,14 @@ impl LevelInfo { /// Given a level's information, calculate the offsets required to index an array correctly. pub(crate) fn filter_array_indices(&self) -> Vec { // happy path if not dealing with lists - if !self.is_list { + let is_nullable = match self.level_type { + LevelType::Primitive(is_nullable) => is_nullable, + _ => panic!( + "Cannot filter indices on a non-primitive array, found {:?}", + self.level_type + ), + }; + if self.repetition.is_none() { return self .definition .iter() @@ -697,7 +699,7 @@ impl LevelInfo { if *def == self.max_definition { filtered.push(index); } - if *def >= self.max_definition - self.is_nullable as i16 { + if *def >= self.max_definition - is_nullable as i16 { index += 1; } }); @@ -746,8 +748,7 @@ mod tests { array_offsets: vec![0, 1, 2], // 2 records, root offsets always sequential array_mask: vec![true, true], // both lists defined max_definition: 0, - is_list: false, // root is never list - is_nullable: false, // root in example is non-nullable + level_type: LevelType::Root, }; // offset into array, each level1 has 2 values let array_offsets = vec![0, 2, 4]; @@ -757,9 +758,7 @@ mod tests { let levels = parent_levels.calculate_child_levels( array_offsets.clone(), array_mask, - true, - false, - false, + LevelType::List(false), ); // let expected_levels = LevelInfo { @@ -768,8 +767,7 @@ mod tests { array_offsets, array_mask: vec![true, true, true, true], max_definition: 1, - is_list: true, - is_nullable: false, + level_type: LevelType::List(false), }; // the separate asserts make it easier to see what's failing assert_eq!(&levels.definition, &expected_levels.definition); @@ -777,8 +775,7 @@ mod tests { assert_eq!(&levels.array_mask, &expected_levels.array_mask); assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.is_list, &expected_levels.is_list); - assert_eq!(&levels.is_nullable, &expected_levels.is_nullable); + assert_eq!(&levels.level_type, &expected_levels.level_type); // this assert is to help if there are more variables added to the struct assert_eq!(&levels, &expected_levels); @@ -789,9 +786,7 @@ mod tests { let levels = parent_levels.calculate_child_levels( array_offsets.clone(), array_mask, - true, - false, - false, + LevelType::List(false), ); let expected_levels = LevelInfo { definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2], @@ -799,16 +794,14 @@ mod tests { array_offsets, array_mask: vec![true; 10], max_definition: 2, - is_list: true, - is_nullable: false, + level_type: LevelType::List(false), }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); assert_eq!(&levels.array_mask, &expected_levels.array_mask); assert_eq!(&levels.max_definition, &expected_levels.max_definition); assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); - assert_eq!(&levels.is_list, &expected_levels.is_list); - assert_eq!(&levels.is_nullable, &expected_levels.is_nullable); + assert_eq!(&levels.level_type, &expected_levels.level_type); assert_eq!(&levels, &expected_levels); } @@ -821,8 +814,7 @@ mod tests { array_offsets: (0..=10).collect(), array_mask: vec![true; 10], max_definition: 0, - is_list: false, - is_nullable: false, + level_type: LevelType::Root, }; let array_offsets: Vec = (0..=10).collect(); let array_mask = vec![true; 10]; @@ -830,9 +822,7 @@ mod tests { let levels = parent_levels.calculate_child_levels( array_offsets.clone(), array_mask.clone(), - false, - false, - false, + LevelType::Primitive(false), ); let expected_levels = LevelInfo { definition: vec![1; 10], @@ -840,8 +830,7 @@ mod tests { array_offsets, array_mask, max_definition: 1, - is_list: false, - is_nullable: false, + level_type: LevelType::Primitive(false), }; assert_eq!(&levels, &expected_levels); } @@ -855,8 +844,7 @@ mod tests { array_offsets: (0..=5).collect(), array_mask: vec![true, true, true, true, true], max_definition: 0, - is_list: false, - is_nullable: false, + level_type: LevelType::Root, }; let array_offsets: Vec = (0..=5).collect(); let array_mask = vec![true, false, true, true, false]; @@ -864,9 +852,7 @@ mod tests { let levels = parent_levels.calculate_child_levels( array_offsets.clone(), array_mask.clone(), - false, - false, - true, + LevelType::Primitive(true), ); let expected_levels = LevelInfo { definition: vec![1, 0, 1, 1, 0], @@ -874,8 +860,7 @@ mod tests { array_offsets, array_mask, max_definition: 1, - is_list: false, - is_nullable: true, + level_type: LevelType::Primitive(true), }; assert_eq!(&levels, &expected_levels); } @@ -890,8 +875,7 @@ mod tests { array_offsets: vec![0, 1, 2, 3, 4, 5], array_mask: vec![true, true, true, true, true], max_definition: 0, - is_list: false, - is_nullable: false, + level_type: LevelType::Root, }; let array_offsets = vec![0, 2, 2, 4, 8, 11]; let array_mask = vec![true, false, true, true, true]; @@ -899,9 +883,7 @@ mod tests { let levels = parent_levels.calculate_child_levels( array_offsets.clone(), array_mask, - true, - false, - true, + LevelType::List(true), ); // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]] // all values are defined as we do not have nulls on the root (batch) @@ -912,22 +894,24 @@ mod tests { // 3: 0, 1, 1, 1 // 4: 0, 1, 1 let expected_levels = LevelInfo { - definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2], + // The levels are normally 2 because we: + // - Calculate the level at the list + // - Calculate the level at the list's child + // We do not do this in these tests, thus the levels are 1 less. + definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1], repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), array_offsets, array_mask: vec![ true, true, false, true, true, true, true, true, true, true, true, true, ], - max_definition: 2, - is_list: true, - is_nullable: true, + max_definition: 1, + level_type: LevelType::List(true), }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.is_list, &expected_levels.is_list); - assert_eq!(&levels.is_nullable, &expected_levels.is_nullable); + assert_eq!(&levels.level_type, &expected_levels.level_type); assert_eq!(&levels, &expected_levels); } @@ -952,8 +936,7 @@ mod tests { array_offsets: vec![0, 1, 2, 3, 4, 5], array_mask: vec![false, true, false, true, true], max_definition: 1, - is_list: false, - is_nullable: true, + level_type: LevelType::Struct(true), }; let array_offsets = vec![0, 2, 2, 4, 8, 11]; let array_mask = vec![true, false, true, true, true]; @@ -961,9 +944,7 @@ mod tests { let levels = parent_levels.calculate_child_levels( array_offsets.clone(), array_mask, - true, - false, - true, + LevelType::List(true), ); let expected_levels = LevelInfo { // 0 1 [2] are 0 (not defined at level 1) @@ -971,23 +952,21 @@ mod tests { // 2 3 [4] are 0 // 4 5 6 7 [8] are 1 (defined at level 1 only) // 8 9 10 [11] are 2 (defined at both levels) - definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3], + definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2], repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), array_offsets, array_mask: vec![ false, false, false, false, false, true, true, true, true, true, true, true, ], - max_definition: 3, - is_nullable: true, - is_list: true, + max_definition: 2, + level_type: LevelType::List(true), }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.is_list, &expected_levels.is_list); - assert_eq!(&levels.is_nullable, &expected_levels.is_nullable); + assert_eq!(&levels.level_type, &expected_levels.level_type); assert_eq!(&levels, &expected_levels); // nested lists (using previous test) @@ -999,9 +978,7 @@ mod tests { let levels = nested_parent_levels.calculate_child_levels( array_offsets.clone(), array_mask, - true, - false, - true, + LevelType::List(true), ); let expected_levels = LevelInfo { // (def: 0) 0 1 [2] are 0 (take parent) @@ -1028,7 +1005,7 @@ mod tests { // 3: [[108, 109], [110, 111], [112, 113], [114, 115]] // 4: [[116, 117], [118, 119], [120, 121]] definition: vec![ - 0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, + 0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, ], repetition: Some(vec![ 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, @@ -1039,17 +1016,15 @@ mod tests { true, true, true, true, true, true, true, true, true, true, true, true, true, ], - max_definition: 5, - is_nullable: true, - is_list: true, + max_definition: 4, + level_type: LevelType::List(true), }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); assert_eq!(&levels.array_mask, &expected_levels.array_mask); assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.is_list, &expected_levels.is_list); - assert_eq!(&levels.is_nullable, &expected_levels.is_nullable); + assert_eq!(&levels.level_type, &expected_levels.level_type); assert_eq!(&levels, &expected_levels); } @@ -1067,8 +1042,7 @@ mod tests { array_offsets: vec![0, 1, 2, 3, 4], array_mask: vec![true, true, true, true], max_definition: 1, - is_list: false, - is_nullable: false, + level_type: LevelType::Struct(true), }; // 0: null ([], but mask is false, so it's not just an empty list) // 1: [1, 2, 3] @@ -1080,29 +1054,25 @@ mod tests { let levels = parent_levels.calculate_child_levels( array_offsets.clone(), array_mask, - true, - false, - true, + LevelType::List(true), ); // 0: [null], level 1 is defined, but not 2 // 1: [1, 2, 3] // 2: [4, 5] // 3: [6, 7] let expected_levels = LevelInfo { - definition: vec![2, 3, 3, 3, 3, 3, 3, 3], + definition: vec![1, 2, 2, 2, 2, 2, 2, 2], repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]), array_offsets, array_mask: vec![false, true, true, true, true, true, true, true], - max_definition: 3, - is_list: true, - is_nullable: true, + max_definition: 2, + level_type: LevelType::List(true), }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.is_list, &expected_levels.is_list); - assert_eq!(&levels.is_nullable, &expected_levels.is_nullable); + assert_eq!(&levels.level_type, &expected_levels.level_type); assert_eq!(&levels, &expected_levels); // nested lists (using previous test) @@ -1121,9 +1091,7 @@ mod tests { let levels = nested_parent_levels.calculate_child_levels( array_offsets.clone(), array_mask, - true, - false, - true, + LevelType::List(true), ); // We have 7 array values, and at least 15 primitives (from array_offsets) // 0: (-)[null], parent was null, no value populated here @@ -1137,24 +1105,22 @@ mod tests { // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]} // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]} let expected_levels = LevelInfo { - definition: vec![2, 5, 5, 5, 3, 5, 5, 5, 5, 5, 5, 5, 3, 5, 5, 5, 5, 5], + definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4], repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]), array_mask: vec![ false, true, true, true, false, true, true, true, true, true, true, true, true, true, true, true, true, true, ], array_offsets, - is_list: true, - is_nullable: true, - max_definition: 5, + max_definition: 4, + level_type: LevelType::List(true), }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); assert_eq!(&levels.array_offsets, &expected_levels.array_offsets); assert_eq!(&levels.array_mask, &expected_levels.array_mask); assert_eq!(&levels.max_definition, &expected_levels.max_definition); - assert_eq!(&levels.is_list, &expected_levels.is_list); - assert_eq!(&levels.is_nullable, &expected_levels.is_nullable); + assert_eq!(&levels.level_type, &expected_levels.level_type); assert_eq!(&levels, &expected_levels); } @@ -1174,8 +1140,7 @@ mod tests { array_offsets: (0..=6).collect(), array_mask: vec![true, true, true, true, false, true], max_definition: 1, - is_list: false, - is_nullable: true, + level_type: LevelType::Struct(true), }; // b's offset and mask let b_offsets: Vec = (0..=6).collect(); @@ -1187,11 +1152,13 @@ mod tests { array_offsets: (0..=6).collect(), array_mask: vec![true, true, true, false, false, true], max_definition: 2, - is_list: false, - is_nullable: true, + level_type: LevelType::Struct(true), }; - let b_levels = - a_levels.calculate_child_levels(b_offsets.clone(), b_mask, false, true, true); + let b_levels = a_levels.calculate_child_levels( + b_offsets.clone(), + b_mask, + LevelType::Struct(true), + ); assert_eq!(&b_expected_levels, &b_levels); // c's offset and mask @@ -1204,11 +1171,10 @@ mod tests { array_offsets: c_offsets.clone(), array_mask: vec![true, false, true, false, false, true], max_definition: 3, - is_list: false, - is_nullable: true, + level_type: LevelType::Struct(true), }; let c_levels = - b_levels.calculate_child_levels(c_offsets, c_mask, false, true, true); + b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true)); assert_eq!(&c_expected_levels, &c_levels); } @@ -1243,8 +1209,7 @@ mod tests { array_offsets: (0..=5).collect(), array_mask: vec![true, true, true, true, true], max_definition: 0, - is_list: false, - is_nullable: false, + level_type: LevelType::Root, }; let batch_level = LevelInfo::new_from_batch(&batch); @@ -1257,8 +1222,7 @@ mod tests { .iter() .zip(batch.schema().fields()) .for_each(|(array, field)| { - let mut array_levels = - batch_level.calculate_array_levels(array, field, false); + let mut array_levels = batch_level.calculate_array_levels(array, field); levels.append(&mut array_levels); }); assert_eq!(levels.len(), 1); @@ -1273,16 +1237,14 @@ mod tests { true, true, true, false, true, true, true, true, true, true, true, ], max_definition: 3, - is_list: true, - is_nullable: true, + level_type: LevelType::Primitive(true), }; assert_eq!(&list_level.definition, &expected_level.definition); assert_eq!(&list_level.repetition, &expected_level.repetition); assert_eq!(&list_level.array_offsets, &expected_level.array_offsets); assert_eq!(&list_level.array_mask, &expected_level.array_mask); assert_eq!(&list_level.max_definition, &expected_level.max_definition); - assert_eq!(&list_level.is_list, &expected_level.is_list); - assert_eq!(&list_level.is_nullable, &expected_level.is_nullable); + assert_eq!(&list_level.level_type, &expected_level.level_type); assert_eq!(list_level, &expected_level); } @@ -1290,8 +1252,6 @@ mod tests { fn mixed_struct_list() { // this tests the level generation from the equivalent arrow_writer_complex test - // TODO: Investigate failure if struct is null. See https://github.com/apache/arrow-rs/issues/245 - // define schema let struct_field_d = Field::new("d", DataType::Float64, true); let struct_field_f = Field::new("f", DataType::Float32, true); @@ -1360,8 +1320,7 @@ mod tests { array_offsets: (0..=5).collect(), array_mask: vec![true, true, true, true, true], max_definition: 0, - is_list: false, - is_nullable: false, + level_type: LevelType::Root, }; let batch_level = LevelInfo::new_from_batch(&batch); @@ -1374,8 +1333,7 @@ mod tests { .iter() .zip(batch.schema().fields()) .for_each(|(array, field)| { - let mut array_levels = - batch_level.calculate_array_levels(array, field, false); + let mut array_levels = batch_level.calculate_array_levels(array, field); levels.append(&mut array_levels); }); assert_eq!(levels.len(), 5); @@ -1389,8 +1347,7 @@ mod tests { array_offsets: vec![0, 1, 2, 3, 4, 5], array_mask: vec![true, true, true, true, true], max_definition: 1, - is_list: false, - is_nullable: false, + level_type: LevelType::Primitive(false), }; assert_eq!(list_level, &expected_level); @@ -1403,8 +1360,7 @@ mod tests { array_offsets: vec![0, 1, 2, 3, 4, 5], array_mask: vec![true, false, false, true, true], max_definition: 1, - is_list: false, - is_nullable: true, + level_type: LevelType::Primitive(true), }; assert_eq!(list_level, &expected_level); @@ -1417,8 +1373,7 @@ mod tests { array_offsets: vec![0, 1, 2, 3, 4, 5], array_mask: vec![false, false, false, true, false], max_definition: 2, - is_list: false, - is_nullable: true, + level_type: LevelType::Primitive(true), }; assert_eq!(list_level, &expected_level); @@ -1431,8 +1386,7 @@ mod tests { array_offsets: vec![0, 1, 2, 3, 4, 5], array_mask: vec![true, false, true, false, true], max_definition: 3, - is_list: false, - is_nullable: true, + level_type: LevelType::Primitive(true), }; assert_eq!(list_level, &expected_level); } @@ -1445,8 +1399,7 @@ mod tests { array_offsets: vec![0, 3, 3, 6], array_mask: vec![true, true, true, false, true, true, true], max_definition: 3, - is_list: true, - is_nullable: true, + level_type: LevelType::Primitive(true), }; let expected = vec![0, 1, 2, 3, 4, 5]; @@ -1476,11 +1429,8 @@ mod tests { .unwrap(); let batch_level = LevelInfo::new_from_batch(&batch); - let struct_null_level = batch_level.calculate_array_levels( - batch.column(0), - batch.schema().field(0), - false, - ); + let struct_null_level = + batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0)); // create second batch // define schema @@ -1503,11 +1453,8 @@ mod tests { .unwrap(); let batch_level = LevelInfo::new_from_batch(&batch); - let struct_non_null_level = batch_level.calculate_array_levels( - batch.column(0), - batch.schema().field(0), - false, - ); + let struct_non_null_level = + batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0)); // The 2 levels should not be the same if struct_non_null_level == struct_null_level { From 2894b3f230a41e801d749211c93dccfc5fe1f057 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 8 May 2021 16:59:39 +0200 Subject: [PATCH 2/3] fix list reader null and empty slot calculation --- parquet/src/arrow/array_reader.rs | 95 ++++++++++++++++--------------- 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index f209b8baabf0..f54e446192b1 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -615,6 +615,8 @@ pub struct ListArrayReader { item_type: ArrowType, list_def_level: i16, list_rep_level: i16, + list_empty_def_level: i16, + list_null_def_level: i16, def_level_buffer: Option, rep_level_buffer: Option, _marker: PhantomData, @@ -628,6 +630,8 @@ impl ListArrayReader { item_type: ArrowType, def_level: i16, rep_level: i16, + list_null_def_level: i16, + list_empty_def_level: i16, ) -> Self { Self { item_reader, @@ -635,6 +639,8 @@ impl ListArrayReader { item_type, list_def_level: def_level, list_rep_level: rep_level, + list_null_def_level, + list_empty_def_level, def_level_buffer: None, rep_level_buffer: None, _marker: PhantomData, @@ -843,61 +849,49 @@ impl ArrayReader for ListArrayReader { // Where n is the max definition level of the list's parent. // If a Parquet schema's only leaf is the list, then n = 0. - // TODO: ARROW-10391 - add a test case with a non-nullable child, check if max is 3 - let list_field_type = match self.get_data_type() { - ArrowType::List(field) - | ArrowType::FixedSizeList(field, _) - | ArrowType::LargeList(field) => field, - _ => { - // Panic: this is safe as we only write lists from list datatypes - unreachable!() - } - }; - let max_list_def_range = if list_field_type.is_nullable() { 3 } else { 2 }; - let max_list_definition = *(def_levels.iter().max().unwrap()); - // TODO: ARROW-10391 - Find a reliable way of validating deeply-nested lists - // debug_assert!( - // max_list_definition >= max_list_def_range, - // "Lift definition max less than range" - // ); - let list_null_def = max_list_definition - max_list_def_range; - let list_empty_def = max_list_definition - 1; - let mut null_list_indices: Vec = Vec::new(); - for i in 0..def_levels.len() { - if def_levels[i] == list_null_def { - null_list_indices.push(i); - } - } + // If the list index is at empty definition, the child slot is null + let null_list_indices: Vec = def_levels + .iter() + .enumerate() + .filter_map(|(index, def)| { + if *def <= self.list_empty_def_level { + Some(index) + } else { + None + } + }) + .collect(); let batch_values = match null_list_indices.len() { 0 => next_batch_array.clone(), _ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?, }; - // null list has def_level = 0 - // empty list has def_level = 1 - // null item in a list has def_level = 2 - // non-null item has def_level = 3 // first item in each list has rep_level = 0, subsequent items have rep_level = 1 - let mut offsets: Vec = Vec::new(); let mut cur_offset = OffsetSize::zero(); - for i in 0..rep_levels.len() { - if rep_levels[i] == 0 { - offsets.push(cur_offset) + def_levels.iter().zip(rep_levels).for_each(|(d, r)| { + if *r == 0 || d == &self.list_empty_def_level { + offsets.push(cur_offset); } - if def_levels[i] >= list_empty_def { + if d > &self.list_empty_def_level { cur_offset += OffsetSize::one(); } - } + }); offsets.push(cur_offset); let num_bytes = bit_util::ceil(offsets.len(), 8); - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + // TODO: A useful optimization is to use the null count to fill with + // 0 or null, to reduce individual bits set in a loop. + // To favour dense data, set every slot to true, then unset + let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); let null_slice = null_buf.as_slice_mut(); let mut list_index = 0; for i in 0..rep_levels.len() { - if rep_levels[i] == 0 && def_levels[i] != 0 { - bit_util::set_bit(null_slice, list_index); + // If the level is lower than empty, then the slot is null. + // When a list is non-nullable, its empty level = null level, + // so this automatically factors that in. + if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level { + bit_util::unset_bit(null_slice, list_index); } if rep_levels[i] == 0 { list_index += 1; @@ -1282,16 +1276,15 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext let mut new_context = context.clone(); new_context.path.append(vec![list_type.name().to_string()]); + // We need to know at what definition a list or its child is null + let list_null_def = new_context.def_level; + let mut list_empty_def = new_context.def_level; - match list_type.get_basic_info().repetition() { - Repetition::REPEATED => { - new_context.def_level += 1; - new_context.rep_level += 1; - } - Repetition::OPTIONAL => { - new_context.def_level += 1; - } - _ => (), + // If the list's root is nullable + if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() { + new_context.def_level += 1; + // current level is nullable, increment to get level for empty list slot + list_empty_def += 1; } match list_child.get_basic_info().repetition() { @@ -1350,6 +1343,8 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext item_reader_type, new_context.def_level, new_context.rep_level, + list_null_def, + list_empty_def, )), ArrowType::LargeList(_) => Box::new(ListArrayReader::::new( item_reader, @@ -1357,6 +1352,8 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext item_reader_type, new_context.def_level, new_context.rep_level, + list_null_def, + list_empty_def, )), _ => { @@ -2468,6 +2465,8 @@ mod tests { ArrowType::Int32, 1, 1, + 0, + 1, ); let next_batch = list_array_reader.next_batch(1024).unwrap(); @@ -2522,6 +2521,8 @@ mod tests { ArrowType::Int32, 1, 1, + 0, + 1, ); let next_batch = list_array_reader.next_batch(1024).unwrap(); From a9fb2466d4a36a4375cf49b7844a20a905895d20 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 11 May 2021 05:14:24 +0200 Subject: [PATCH 3/3] remove stray TODOs --- parquet/src/arrow/arrow_writer.rs | 2 +- parquet/src/arrow/levels.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 1e35e4b5f583..be278eda5360 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -1366,7 +1366,7 @@ mod tests { let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( "item", DataType::Int32, - true, // TODO: why does this fail when false? Is it related to logical nulls? + false, )))) .len(5) .add_buffer(a_value_offsets) diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs index 6d9b2375c3e7..be567260e1d8 100644 --- a/parquet/src/arrow/levels.rs +++ b/parquet/src/arrow/levels.rs @@ -331,7 +331,6 @@ impl LevelInfo { unreachable!("Cannot have a root as a child") } (LevelType::List(_), _) => { - // Always add 1 (TDDO: document why) self.max_definition + 1 + level_type.level_increment() } (LevelType::Struct(_), _) => {