Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parquet definition levels #511

Merged
merged 2 commits into from
Jul 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,6 @@ mod tests {
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();

// I think this setup is incorrect because this should pass
assert_eq!(batch.column(0).data().null_count(), 1);

let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
Expand All @@ -712,6 +711,47 @@ mod tests {
writer.close().unwrap();
}

#[test]
fn arrow_writer_list_non_null() {
// define schema
let schema = Schema::new(vec![Field::new(
"a",
DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
false,
)]);

// create some data
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// Construct a buffer for value offsets, for the nested array:
// [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
let a_value_offsets =
arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());

// Construct a list array from the above two
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"item",
DataType::Int32,
false,
))))
.len(5)
.add_buffer(a_value_offsets)
.add_child_data(a_values.data().clone())
.build();
let a = ListArray::from(a_list_data);

// build a record batch
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();

assert_eq!(batch.column(0).data().null_count(), 0);

let file = get_temp_file("test_arrow_writer_list_non_null.parquet", &[]);
let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed other tests like arrow_writer_binary also read data back from parquet and validate the results (and thus confirm data survives the roundtrip).

Would it make sense to have the same test here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb it's because fn roundtrip was added after these tests existed (and I think the reader was lagging behind the writer), so we never expanded it to them. I've now done so, removing quite a bit of duplication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much nicer 👍

}

#[test]
fn arrow_writer_binary() {
let string_field = Field::new("a", DataType::Utf8, false);
Expand Down
94 changes: 48 additions & 46 deletions parquet/src/arrow/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ impl LevelType {
const fn level_increment(&self) -> i16 {
match self {
LevelType::Root => 0,
LevelType::List(is_nullable)
| LevelType::Struct(is_nullable)
| LevelType::Primitive(is_nullable) => *is_nullable as i16,
// List repetition adds a constant 1
LevelType::List(is_nullable) => 1 + *is_nullable as i16,
LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => {
*is_nullable as i16
}
}
}
}
Expand Down Expand Up @@ -334,37 +336,27 @@ impl LevelInfo {
let mut merged_array_mask = Vec::with_capacity(min_len);

let max_definition = match (self.level_type, level_type) {
(LevelType::Root, LevelType::Struct(is_nullable)) => {
// If the struct is non-nullable, its def level doesn't increment
is_nullable as i16
}
(LevelType::Root, _) => 1,
// Handle the illegal cases
(_, LevelType::Root) => {
unreachable!("Cannot have a root as a child")
}
(LevelType::List(_), _) => {
self.max_definition + 1 + level_type.level_increment()
}
(LevelType::Struct(_), _) => {
self.max_definition + level_type.level_increment()
}
(_, LevelType::List(is_nullable)) => {
// if the child is a list, even if its parent is a root
self.max_definition + 1 + is_nullable as i16
}
(LevelType::Primitive(_), _) => {
unreachable!("Cannot have a primitive parent for any type")
}
// The general case
(_, _) => self.max_definition + level_type.level_increment(),
};

match (self.level_type, level_type) {
(LevelType::List(_), LevelType::List(is_nullable)) => {
// parent is a list or descendant of a list, and child is a list
// Parent is a list or descendant of a list, and child is a list
let reps = self.repetition.clone().unwrap();
// Calculate the 2 list hierarchy definitions in advance
// List is not empty, but null
let l2 = max_definition - is_nullable as i16;
// List is not empty, and not null

// List is null, and not empty
let l1 = max_definition - is_nullable as i16;
// List is not null, but is empty
let l2 = max_definition - 1;
// List is not null, and not empty
let l3 = max_definition;

let mut nulls_seen = 0;
Expand Down Expand Up @@ -399,7 +391,9 @@ impl LevelInfo {
let merged_mask = parent_mask && child_mask;

if child_len == 0 {
definition.push(parent_def);
// Empty slot, i.e. {"parent": {"child": [] } }
// Nullness takes priority over emptiness
definition.push(if child_mask { l2 } else { l1 });
repetition.push(parent_rep);
merged_array_mask.push(merged_mask);
} else {
Expand All @@ -419,7 +413,7 @@ impl LevelInfo {
} else if child_mask {
l3
} else {
l2
l1
});
repetition.push(rep);
merged_array_mask.push(merged_mask);
Expand Down Expand Up @@ -506,9 +500,11 @@ impl LevelInfo {
// Encountering a list for the first time.
// Calculate the 2 list hierarchy definitions in advance

// List is not empty, but null (if nullable)
let l2 = max_definition - is_nullable as i16;
// List is not empty, and not null
// List is null, and not empty
let l1 = max_definition - 1 - is_nullable as i16;
// List is not null, but is empty
let l2 = max_definition - 1;
// List is not null, and not empty
let l3 = max_definition;

self.definition
Expand All @@ -523,20 +519,24 @@ impl LevelInfo {

match (parent_mask, child_len) {
(true, 0) => {
// empty slot that is valid, i.e. {"parent": {"child": [] } }
definition.push(if child_mask { l3 } else { l2 });
// Empty slot, i.e. {"parent": {"child": [] } }
// Nullness takes priority over emptiness
definition.push(if child_mask { l2 } else { l1 });
repetition.push(0);
merged_array_mask.push(child_mask);
}
(false, 0) => {
// Inherit the parent definition as parent was null
definition.push(*def);
repetition.push(0);
merged_array_mask.push(child_mask);
}
(true, _) => {
(child_from..child_to).for_each(|child_index| {
definition.push(if child_mask { l3 } else { l2 });
// mark the first child slot as 0, and the next as 1
// l1 and l3 make sense as list is not empty,
// but we reflect that it's either null or not
definition.push(if child_mask { l3 } else { l1 });
// Mark the first child slot as 0, and the next as 1
repetition.push(if child_index == child_from {
0
} else {
Expand All @@ -547,6 +547,7 @@ impl LevelInfo {
}
(false, _) => {
(child_from..child_to).for_each(|child_index| {
// Inherit the parent definition as parent was null
definition.push(*def);
// mark the first child slot as 0, and the next as 1
repetition.push(if child_index == child_from {
Expand Down Expand Up @@ -867,11 +868,12 @@ mod tests {
LevelType::Primitive(false),
);
let expected_levels = LevelInfo {
definition: vec![1; 10],
// As it is non-null, definitions can be omitted
definition: vec![0; 10],
repetition: None,
array_offsets,
array_mask,
max_definition: 1,
max_definition: 0,
level_type: LevelType::Primitive(false),
offset: 0,
length: 10,
Expand Down Expand Up @@ -948,13 +950,13 @@ mod tests {
// - Calculate the level at the list
// - Calculate the level at the list's child
// We do not do this in these tests, thus the levels are 1 less.
definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1],
definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
true, true, false, true, true, true, true, true, true, true, true, true,
],
max_definition: 1,
max_definition: 2,
level_type: LevelType::List(true),
offset: 0,
length: 11, // the child has 11 slots
Expand Down Expand Up @@ -1006,14 +1008,14 @@ mod tests {
// 2 3 [4] are 0
// 4 5 6 7 [8] are 1 (defined at level 1 only)
// 8 9 10 [11] are 2 (defined at both levels)
definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2],
definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
false, false, false, false, false, true, true, true, true, true, true,
true,
],
max_definition: 2,
max_definition: 3,
level_type: LevelType::List(true),
offset: 0,
length: 11,
Expand Down Expand Up @@ -1061,7 +1063,7 @@ mod tests {
// 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
// 4: [[116, 117], [118, 119], [120, 121]]
definition: vec![
0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
],
repetition: Some(vec![
0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
Expand All @@ -1072,7 +1074,7 @@ mod tests {
true, true, true, true, true, true, true, true, true, true, true, true,
true,
],
max_definition: 4,
max_definition: 5,
level_type: LevelType::List(true),
offset: 0,
length: 22,
Expand Down Expand Up @@ -1121,11 +1123,11 @@ mod tests {
// 2: [4, 5]
// 3: [6, 7]
let expected_levels = LevelInfo {
definition: vec![1, 2, 2, 2, 2, 2, 2, 2],
definition: vec![1, 3, 3, 3, 3, 3, 3, 3],
repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
array_offsets,
array_mask: vec![false, true, true, true, true, true, true, true],
max_definition: 2,
max_definition: 3,
level_type: LevelType::List(true),
offset: 0,
length: 8,
Expand Down Expand Up @@ -1167,14 +1169,14 @@ mod tests {
// 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
// 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
let expected_levels = LevelInfo {
definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4],
definition: vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5],
repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
array_mask: vec![
false, true, true, true, false, true, true, true, true, true, true, true,
true, true, true, true, true, true,
],
array_offsets,
max_definition: 4,
max_definition: 5,
level_type: LevelType::List(true),
offset: 0,
length: 16,
Expand Down Expand Up @@ -1416,11 +1418,11 @@ mod tests {
let list_level = levels.get(0).unwrap();

let expected_level = LevelInfo {
definition: vec![1, 1, 1, 1, 1],
definition: vec![0, 0, 0, 0, 0],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, true, true, true, true],
max_definition: 1,
max_definition: 0,
level_type: LevelType::Primitive(false),
offset: 0,
length: 5,
Expand Down