diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 69ebce6950f5..bfd1593249b7 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -703,7 +703,6 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap(); - // I think this setup is incorrect because this should pass assert_eq!(batch.column(0).data().null_count(), 1); let file = get_temp_file("test_arrow_writer_list.parquet", &[]); @@ -712,6 +711,47 @@ mod tests { writer.close().unwrap(); } + #[test] + fn arrow_writer_list_non_null() { + // define schema + let schema = Schema::new(vec![Field::new( + "a", + DataType::List(Box::new(Field::new("item", DataType::Int32, false))), + false, + )]); + + // create some data + let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + + // Construct a buffer for value offsets, for the nested array: + // [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]] + let a_value_offsets = + arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice()); + + // Construct a list array from the above two + let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new( + "item", + DataType::Int32, + false, + )))) + .len(5) + .add_buffer(a_value_offsets) + .add_child_data(a_values.data().clone()) + .build(); + let a = ListArray::from(a_list_data); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap(); + + assert_eq!(batch.column(0).data().null_count(), 0); + + let file = get_temp_file("test_arrow_writer_list_non_null.parquet", &[]); + let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + #[test] fn arrow_writer_binary() { let string_field = Field::new("a", DataType::Utf8, false); diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs index 2e95039c16b9..0af0f9efa6c1 100644 --- a/parquet/src/arrow/levels.rs +++ b/parquet/src/arrow/levels.rs @@ -86,9 +86,11 @@ impl LevelType { const fn level_increment(&self) -> i16 { match self { LevelType::Root => 0, - LevelType::List(is_nullable) - | LevelType::Struct(is_nullable) - | LevelType::Primitive(is_nullable) => *is_nullable as i16, + // List repetition adds a constant 1 + LevelType::List(is_nullable) => 1 + *is_nullable as i16, + LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => { + *is_nullable as i16 + } } } } @@ -334,37 +336,27 @@ impl LevelInfo { let mut merged_array_mask = Vec::with_capacity(min_len); let max_definition = match (self.level_type, level_type) { - (LevelType::Root, LevelType::Struct(is_nullable)) => { - // If the struct is non-nullable, its def level doesn't increment - is_nullable as i16 - } - (LevelType::Root, _) => 1, + // Handle the illegal cases (_, LevelType::Root) => { unreachable!("Cannot have a root as a child") } - (LevelType::List(_), _) => { - self.max_definition + 1 + level_type.level_increment() - } - (LevelType::Struct(_), _) => { - self.max_definition + level_type.level_increment() - } - (_, LevelType::List(is_nullable)) => { - // if the child is a list, even if its parent is a root - self.max_definition + 1 + is_nullable as i16 - } (LevelType::Primitive(_), _) => { unreachable!("Cannot have a primitive parent for any type") } + // The general case + (_, _) => self.max_definition + level_type.level_increment(), }; match (self.level_type, level_type) { (LevelType::List(_), LevelType::List(is_nullable)) => { - // parent is a list or descendant of a list, and child is a list + // Parent is a list or descendant of a list, and child is a list let reps = self.repetition.clone().unwrap(); - // Calculate the 2 list hierarchy definitions in advance - // List is not empty, but null - let l2 = max_definition - is_nullable as i16; - // List is not empty, and not null + + // List is null, and not empty + let l1 = max_definition - is_nullable as i16; + // List is not null, but is empty + let l2 = max_definition - 1; + // List is not null, and not empty let l3 = max_definition; let mut nulls_seen = 0; @@ -399,7 +391,9 @@ impl LevelInfo { let merged_mask = parent_mask && child_mask; if child_len == 0 { - definition.push(parent_def); + // Empty slot, i.e. {"parent": {"child": [] } } + // Nullness takes priority over emptiness + definition.push(if child_mask { l2 } else { l1 }); repetition.push(parent_rep); merged_array_mask.push(merged_mask); } else { @@ -419,7 +413,7 @@ impl LevelInfo { } else if child_mask { l3 } else { - l2 + l1 }); repetition.push(rep); merged_array_mask.push(merged_mask); @@ -506,9 +500,11 @@ impl LevelInfo { // Encountering a list for the first time. // Calculate the 2 list hierarchy definitions in advance - // List is not empty, but null (if nullable) - let l2 = max_definition - is_nullable as i16; - // List is not empty, and not null + // List is null, and not empty + let l1 = max_definition - 1 - is_nullable as i16; + // List is not null, but is empty + let l2 = max_definition - 1; + // List is not null, and not empty let l3 = max_definition; self.definition @@ -523,20 +519,24 @@ impl LevelInfo { match (parent_mask, child_len) { (true, 0) => { - // empty slot that is valid, i.e. {"parent": {"child": [] } } - definition.push(if child_mask { l3 } else { l2 }); + // Empty slot, i.e. {"parent": {"child": [] } } + // Nullness takes priority over emptiness + definition.push(if child_mask { l2 } else { l1 }); repetition.push(0); merged_array_mask.push(child_mask); } (false, 0) => { + // Inherit the parent definition as parent was null definition.push(*def); repetition.push(0); merged_array_mask.push(child_mask); } (true, _) => { (child_from..child_to).for_each(|child_index| { - definition.push(if child_mask { l3 } else { l2 }); - // mark the first child slot as 0, and the next as 1 + // l1 and l3 make sense as list is not empty, + // but we reflect that it's either null or not + definition.push(if child_mask { l3 } else { l1 }); + // Mark the first child slot as 0, and the next as 1 repetition.push(if child_index == child_from { 0 } else { @@ -547,6 +547,7 @@ impl LevelInfo { } (false, _) => { (child_from..child_to).for_each(|child_index| { + // Inherit the parent definition as parent was null definition.push(*def); // mark the first child slot as 0, and the next as 1 repetition.push(if child_index == child_from { @@ -867,11 +868,12 @@ mod tests { LevelType::Primitive(false), ); let expected_levels = LevelInfo { - definition: vec![1; 10], + // As it is non-null, definitions can be omitted + definition: vec![0; 10], repetition: None, array_offsets, array_mask, - max_definition: 1, + max_definition: 0, level_type: LevelType::Primitive(false), offset: 0, length: 10, @@ -948,13 +950,13 @@ mod tests { // - Calculate the level at the list // - Calculate the level at the list's child // We do not do this in these tests, thus the levels are 1 less. - definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1], + definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2], repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), array_offsets, array_mask: vec![ true, true, false, true, true, true, true, true, true, true, true, true, ], - max_definition: 1, + max_definition: 2, level_type: LevelType::List(true), offset: 0, length: 11, // the child has 11 slots @@ -1006,14 +1008,14 @@ mod tests { // 2 3 [4] are 0 // 4 5 6 7 [8] are 1 (defined at level 1 only) // 8 9 10 [11] are 2 (defined at both levels) - definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2], + definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3], repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), array_offsets, array_mask: vec![ false, false, false, false, false, true, true, true, true, true, true, true, ], - max_definition: 2, + max_definition: 3, level_type: LevelType::List(true), offset: 0, length: 11, @@ -1061,7 +1063,7 @@ mod tests { // 3: [[108, 109], [110, 111], [112, 113], [114, 115]] // 4: [[116, 117], [118, 119], [120, 121]] definition: vec![ - 0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, + 0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, ], repetition: Some(vec![ 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, @@ -1072,7 +1074,7 @@ mod tests { true, true, true, true, true, true, true, true, true, true, true, true, true, ], - max_definition: 4, + max_definition: 5, level_type: LevelType::List(true), offset: 0, length: 22, @@ -1121,11 +1123,11 @@ mod tests { // 2: [4, 5] // 3: [6, 7] let expected_levels = LevelInfo { - definition: vec![1, 2, 2, 2, 2, 2, 2, 2], + definition: vec![1, 3, 3, 3, 3, 3, 3, 3], repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]), array_offsets, array_mask: vec![false, true, true, true, true, true, true, true], - max_definition: 2, + max_definition: 3, level_type: LevelType::List(true), offset: 0, length: 8, @@ -1167,14 +1169,14 @@ mod tests { // 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]} // 3: {"struct": [ [], [211, 212, 213, 214, 215] ]} let expected_levels = LevelInfo { - definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4], + definition: vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5], repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]), array_mask: vec![ false, true, true, true, false, true, true, true, true, true, true, true, true, true, true, true, true, true, ], array_offsets, - max_definition: 4, + max_definition: 5, level_type: LevelType::List(true), offset: 0, length: 16, @@ -1416,11 +1418,11 @@ mod tests { let list_level = levels.get(0).unwrap(); let expected_level = LevelInfo { - definition: vec![1, 1, 1, 1, 1], + definition: vec![0, 0, 0, 0, 0], repetition: None, array_offsets: vec![0, 1, 2, 3, 4, 5], array_mask: vec![true, true, true, true, true], - max_definition: 1, + max_definition: 0, level_type: LevelType::Primitive(false), offset: 0, length: 5,