Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix null struct and list roundtrip #270

Merged
merged 3 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 48 additions & 47 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,8 @@ pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
item_type: ArrowType,
list_def_level: i16,
list_rep_level: i16,
list_empty_def_level: i16,
list_null_def_level: i16,
def_level_buffer: Option<Buffer>,
rep_level_buffer: Option<Buffer>,
_marker: PhantomData<OffsetSize>,
Expand All @@ -628,13 +630,17 @@ impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
item_type: ArrowType,
def_level: i16,
rep_level: i16,
list_null_def_level: i16,
list_empty_def_level: i16,
) -> Self {
Self {
item_reader,
data_type,
item_type,
list_def_level: def_level,
list_rep_level: rep_level,
list_null_def_level,
list_empty_def_level,
def_level_buffer: None,
rep_level_buffer: None,
_marker: PhantomData,
Expand Down Expand Up @@ -843,61 +849,49 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
// Where n is the max definition level of the list's parent.
// If a Parquet schema's only leaf is the list, then n = 0.

// TODO: ARROW-10391 - add a test case with a non-nullable child, check if max is 3
let list_field_type = match self.get_data_type() {
ArrowType::List(field)
| ArrowType::FixedSizeList(field, _)
| ArrowType::LargeList(field) => field,
_ => {
// Panic: this is safe as we only write lists from list datatypes
unreachable!()
}
};
let max_list_def_range = if list_field_type.is_nullable() { 3 } else { 2 };
let max_list_definition = *(def_levels.iter().max().unwrap());
// TODO: ARROW-10391 - Find a reliable way of validating deeply-nested lists
// debug_assert!(
// max_list_definition >= max_list_def_range,
// "Lift definition max less than range"
// );
let list_null_def = max_list_definition - max_list_def_range;
let list_empty_def = max_list_definition - 1;
let mut null_list_indices: Vec<usize> = Vec::new();
for i in 0..def_levels.len() {
if def_levels[i] == list_null_def {
null_list_indices.push(i);
}
}
// If the list index is at empty definition, the child slot is null
let null_list_indices: Vec<usize> = def_levels
.iter()
.enumerate()
.filter_map(|(index, def)| {
if *def <= self.list_empty_def_level {
Some(index)
} else {
None
}
})
.collect();
let batch_values = match null_list_indices.len() {
0 => next_batch_array.clone(),
_ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?,
};

// null list has def_level = 0
// empty list has def_level = 1
// null item in a list has def_level = 2
// non-null item has def_level = 3
// first item in each list has rep_level = 0, subsequent items have rep_level = 1

let mut offsets: Vec<OffsetSize> = Vec::new();
let mut cur_offset = OffsetSize::zero();
for i in 0..rep_levels.len() {
if rep_levels[i] == 0 {
offsets.push(cur_offset)
def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
if *r == 0 || d == &self.list_empty_def_level {
offsets.push(cur_offset);
}
if def_levels[i] >= list_empty_def {
if d > &self.list_empty_def_level {
cur_offset += OffsetSize::one();
}
}
});
offsets.push(cur_offset);

let num_bytes = bit_util::ceil(offsets.len(), 8);
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
// TODO: A useful optimization is to use the null count to fill with
// 0 or null, to reduce individual bits set in a loop.
// To favour dense data, set every slot to true, then unset
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
let null_slice = null_buf.as_slice_mut();
let mut list_index = 0;
for i in 0..rep_levels.len() {
if rep_levels[i] == 0 && def_levels[i] != 0 {
bit_util::set_bit(null_slice, list_index);
// If the level is lower than empty, then the slot is null.
// When a list is non-nullable, its empty level = null level,
// so this automatically factors that in.
if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level {
bit_util::unset_bit(null_slice, list_index);
}
if rep_levels[i] == 0 {
list_index += 1;
Expand Down Expand Up @@ -1282,16 +1276,15 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
let mut new_context = context.clone();

new_context.path.append(vec![list_type.name().to_string()]);
// We need to know at what definition a list or its child is null
let list_null_def = new_context.def_level;
let mut list_empty_def = new_context.def_level;

match list_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
_ => (),
// If the list's root is nullable
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
new_context.def_level += 1;
// current level is nullable, increment to get level for empty list slot
list_empty_def += 1;
}

match list_child.get_basic_info().repetition() {
Expand Down Expand Up @@ -1350,13 +1343,17 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),

_ => {
Expand Down Expand Up @@ -2468,6 +2465,8 @@ mod tests {
ArrowType::Int32,
1,
1,
0,
1,
);

let next_batch = list_array_reader.next_batch(1024).unwrap();
Expand Down Expand Up @@ -2522,6 +2521,8 @@ mod tests {
ArrowType::Int32,
1,
1,
0,
1,
);

let next_batch = list_array_reader.next_batch(1024).unwrap();
Expand Down
54 changes: 29 additions & 25 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
let batch_level = LevelInfo::new_from_batch(batch);
let mut row_group_writer = self.writer.next_row_group()?;
for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
let mut levels = batch_level.calculate_array_levels(array, field, false);
let mut levels = batch_level.calculate_array_levels(array, field);
// Reverse levels as we pop() them when writing arrays
levels.reverse();
write_leaves(&mut row_group_writer, array, &mut levels)?;
Expand Down Expand Up @@ -793,25 +793,29 @@ mod tests {
let struct_field_g = Field::new(
"g",
DataType::List(Box::new(Field::new("item", DataType::Int16, true))),
false,
);
let struct_field_h = Field::new(
"h",
DataType::List(Box::new(Field::new("item", DataType::Int16, false))),
true,
);
let struct_field_e = Field::new(
"e",
DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]),
true,
DataType::Struct(vec![
struct_field_f.clone(),
struct_field_g.clone(),
struct_field_h.clone(),
]),
false,
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, true),
// Note: when the below struct is set to non-nullable, this test fails,
// but the output data written is correct.
// Interestingly, pyarrow will read it correctly, but pyspark fails to.
// This might be a compatibility quirk between arrow and parquet.
// We have opened https://github.com/apache/arrow-rs/issues/245 to investigate
Field::new(
"c",
DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]),
true,
false,
),
]);

Expand All @@ -831,15 +835,23 @@ mod tests {
// Construct a list array from the above two
let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
.len(5)
.add_buffer(g_value_offsets)
.add_buffer(g_value_offsets.clone())
.add_child_data(g_value.data().clone())
// .null_bit_buffer(Buffer::from(vec![0b00011011])) // TODO: add to test after resolving other issues
.build();
let g = ListArray::from(g_list_data);
// The difference between g and h is that h has a null bitmap
let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
.len(5)
.add_buffer(g_value_offsets)
.add_child_data(g_value.data().clone())
.null_bit_buffer(Buffer::from(vec![0b00011011]))
.build();
let h = ListArray::from(h_list_data);

let e = StructArray::from(vec![
(struct_field_f, Arc::new(f) as ArrayRef),
(struct_field_g, Arc::new(g) as ArrayRef),
(struct_field_h, Arc::new(h) as ArrayRef),
]);

let c = StructArray::from(vec![
Expand All @@ -860,14 +872,10 @@ mod tests {
#[test]
fn arrow_writer_complex_mixed() {
// This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
// Only writing the "offest_field" column works when "some_nested_object" is non-null.
// This indicates that a non-null struct should not have a null child (with null values).
// One observation is that spark doesn't consider the parent struct's nullness,
// and so, we should investigate the impact of always treating structs as null.
// See https://github.com/apache/arrow-rs/issues/245.
// It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.

// define schema
let offset_field = Field::new("offset", DataType::Int32, true);
let offset_field = Field::new("offset", DataType::Int32, false);
let partition_field = Field::new("partition", DataType::Int64, true);
let topic_field = Field::new("topic", DataType::Utf8, true);
let schema = Schema::new(vec![Field::new(
Expand All @@ -877,7 +885,7 @@ mod tests {
partition_field.clone(),
topic_field.clone(),
]),
true,
false,
)]);

// create some data
Expand Down Expand Up @@ -970,14 +978,10 @@ mod tests {
let schema = Schema::new(vec![field_a.clone()]);

// create data
// When the null buffer of the struct is created, this test fails.
// It appears that the nullness of the struct is ignored when the
// struct is read back.
// See https://github.com/apache/arrow-rs/issues/245
let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
.len(6)
// .null_bit_buffer(Buffer::from(vec![0b00100111]))
.null_bit_buffer(Buffer::from(vec![0b00100111]))
.add_child_data(c.data().clone())
.build();
let b = StructArray::from(b_data);
Expand All @@ -989,7 +993,7 @@ mod tests {
let a = StructArray::from(a_data);

assert_eq!(a.null_count(), 0);
assert_eq!(a.column(0).null_count(), 0);
assert_eq!(a.column(0).null_count(), 2);

// build a racord batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
Expand Down Expand Up @@ -1362,7 +1366,7 @@ mod tests {
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"item",
DataType::Int32,
true, // TODO: why does this fail when false? Is it related to logical nulls?
false,
))))
.len(5)
.add_buffer(a_value_offsets)
Expand Down
Loading