Skip to content

Commit

Permalink
Fix parquet definition levels
Browse files Browse the repository at this point in the history
- non-null primitive should have def = 0, was misinterpreting the spec
- list increments 1 if not null, or 2 if null

This fixes these issues, and updates the tests
  • Loading branch information
nevi-me committed Jun 30, 2021
1 parent f1a831f commit 4fecc32
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 45 deletions.
42 changes: 41 additions & 1 deletion parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,6 @@ mod tests {
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();

// I think this setup is incorrect because this should pass
assert_eq!(batch.column(0).data().null_count(), 1);

let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
Expand All @@ -712,6 +711,47 @@ mod tests {
writer.close().unwrap();
}

#[test]
fn arrow_writer_list_non_null() {
// define schema
let schema = Schema::new(vec![Field::new(
"a",
DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
false,
)]);

// create some data
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// Construct a buffer for value offsets, for the nested array:
// [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
let a_value_offsets =
arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());

// Construct a list array from the above two
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"item",
DataType::Int32,
false,
))))
.len(5)
.add_buffer(a_value_offsets)
.add_child_data(a_values.data().clone())
.build();
let a = ListArray::from(a_list_data);

// build a record batch
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();

assert_eq!(batch.column(0).data().null_count(), 0);

let file = get_temp_file("test_arrow_writer_list_non_null.parquet", &[]);
let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
}

#[test]
fn arrow_writer_binary() {
let string_field = Field::new("a", DataType::Utf8, false);
Expand Down
93 changes: 49 additions & 44 deletions parquet/src/arrow/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ impl LevelType {
const fn level_increment(&self) -> i16 {
match self {
LevelType::Root => 0,
LevelType::List(is_nullable)
| LevelType::Struct(is_nullable)
| LevelType::Primitive(is_nullable) => *is_nullable as i16,
// List repetition adds a constant 1
LevelType::List(is_nullable) => 1 + *is_nullable as i16,
LevelType::Struct(is_nullable) | LevelType::Primitive(is_nullable) => {
*is_nullable as i16
}
}
}
}
Expand Down Expand Up @@ -334,37 +336,30 @@ impl LevelInfo {
let mut merged_array_mask = Vec::with_capacity(min_len);

let max_definition = match (self.level_type, level_type) {
(LevelType::Root, LevelType::Struct(is_nullable)) => {
// If the struct is non-nullable, its def level doesn't increment
is_nullable as i16
}
(LevelType::Root, _) => 1,
// Handle the illegal cases
(_, LevelType::Root) => {
unreachable!("Cannot have a root as a child")
}
(LevelType::List(_), _) => {
self.max_definition + 1 + level_type.level_increment()
}
(LevelType::Struct(_), _) => {
self.max_definition + level_type.level_increment()
}
(_, LevelType::List(is_nullable)) => {
// if the child is a list, even if its parent is a root
self.max_definition + 1 + is_nullable as i16
}
(LevelType::Primitive(_), _) => {
unreachable!("Cannot have a primitive parent for any type")
}
// The general case
(_, _) => self.max_definition + level_type.level_increment(),
};

match (self.level_type, level_type) {
(LevelType::List(_), LevelType::List(is_nullable)) => {
// parent is a list or descendant of a list, and child is a list
let reps = self.repetition.clone().unwrap();
// Calculate the 2 list hierarchy definitions in advance
// List is not empty, but null
let l2 = max_definition - is_nullable as i16;
// List is not empty, and not null

// TODO: Update this to be similar to the 1-level list

// List is null, and not empty
let l1 = max_definition - is_nullable as i16;
// List is not null, but is empty
let l2 = max_definition - 1;
// List is not null, and not empty
let l3 = max_definition;

let mut nulls_seen = 0;
Expand Down Expand Up @@ -399,7 +394,9 @@ impl LevelInfo {
let merged_mask = parent_mask && child_mask;

if child_len == 0 {
definition.push(parent_def);
// Empty slot, i.e. {"parent": {"child": [] } }
// Nullness takes priority over emptiness
definition.push(if child_mask { l2 } else { l1 });
repetition.push(parent_rep);
merged_array_mask.push(merged_mask);
} else {
Expand All @@ -419,7 +416,7 @@ impl LevelInfo {
} else if child_mask {
l3
} else {
l2
l1
});
repetition.push(rep);
merged_array_mask.push(merged_mask);
Expand Down Expand Up @@ -506,9 +503,11 @@ impl LevelInfo {
// Encountering a list for the first time.
// Calculate the 2 list hierarchy definitions in advance

// List is not empty, but null (if nullable)
let l2 = max_definition - is_nullable as i16;
// List is not empty, and not null
// List is null, and not empty
let l1 = max_definition - 1 - is_nullable as i16;
// List is not null, but is empty
let l2 = max_definition - 1;
// List is not null, and not empty
let l3 = max_definition;

self.definition
Expand All @@ -523,20 +522,24 @@ impl LevelInfo {

match (parent_mask, child_len) {
(true, 0) => {
// empty slot that is valid, i.e. {"parent": {"child": [] } }
definition.push(if child_mask { l3 } else { l2 });
// Empty slot, i.e. {"parent": {"child": [] } }
// Nullness takes priority over emptiness
definition.push(if child_mask { l2 } else { l1 });
repetition.push(0);
merged_array_mask.push(child_mask);
}
(false, 0) => {
// Inherit the parent definition as parent was null
definition.push(*def);
repetition.push(0);
merged_array_mask.push(child_mask);
}
(true, _) => {
(child_from..child_to).for_each(|child_index| {
definition.push(if child_mask { l3 } else { l2 });
// mark the first child slot as 0, and the next as 1
// l1 and l3 make sense as list is not empty,
// but we reflect that it's either null or not
definition.push(if child_mask { l3 } else { l1 });
// Mark the first child slot as 0, and the next as 1
repetition.push(if child_index == child_from {
0
} else {
Expand All @@ -547,6 +550,7 @@ impl LevelInfo {
}
(false, _) => {
(child_from..child_to).for_each(|child_index| {
// Inherit the parent definition as parent was null
definition.push(*def);
// mark the first child slot as 0, and the next as 1
repetition.push(if child_index == child_from {
Expand Down Expand Up @@ -867,11 +871,12 @@ mod tests {
LevelType::Primitive(false),
);
let expected_levels = LevelInfo {
definition: vec![1; 10],
// As it is non-null, definitions can be omitted
definition: vec![0; 10],
repetition: None,
array_offsets,
array_mask,
max_definition: 1,
max_definition: 0,
level_type: LevelType::Primitive(false),
offset: 0,
length: 10,
Expand Down Expand Up @@ -948,13 +953,13 @@ mod tests {
// - Calculate the level at the list
// - Calculate the level at the list's child
// We do not do this in these tests, thus the levels are 1 less.
definition: vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1],
definition: vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
true, true, false, true, true, true, true, true, true, true, true, true,
],
max_definition: 1,
max_definition: 2,
level_type: LevelType::List(true),
offset: 0,
length: 11, // the child has 11 slots
Expand Down Expand Up @@ -1006,14 +1011,14 @@ mod tests {
// 2 3 [4] are 0
// 4 5 6 7 [8] are 1 (defined at level 1 only)
// 8 9 10 [11] are 2 (defined at both levels)
definition: vec![0, 0, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2],
definition: vec![0, 0, 1, 0, 0, 3, 3, 3, 3, 3, 3, 3],
repetition: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
array_offsets,
array_mask: vec![
false, false, false, false, false, true, true, true, true, true, true,
true,
],
max_definition: 2,
max_definition: 3,
level_type: LevelType::List(true),
offset: 0,
length: 11,
Expand Down Expand Up @@ -1061,7 +1066,7 @@ mod tests {
// 3: [[108, 109], [110, 111], [112, 113], [114, 115]]
// 4: [[116, 117], [118, 119], [120, 121]]
definition: vec![
0, 0, 0, 0, 1, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
0, 0, 0, 0, 1, 0, 0, 0, 0, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
],
repetition: Some(vec![
0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2,
Expand All @@ -1072,7 +1077,7 @@ mod tests {
true, true, true, true, true, true, true, true, true, true, true, true,
true,
],
max_definition: 4,
max_definition: 5,
level_type: LevelType::List(true),
offset: 0,
length: 22,
Expand Down Expand Up @@ -1121,11 +1126,11 @@ mod tests {
// 2: [4, 5]
// 3: [6, 7]
let expected_levels = LevelInfo {
definition: vec![1, 2, 2, 2, 2, 2, 2, 2],
definition: vec![1, 3, 3, 3, 3, 3, 3, 3],
repetition: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
array_offsets,
array_mask: vec![false, true, true, true, true, true, true, true],
max_definition: 2,
max_definition: 3,
level_type: LevelType::List(true),
offset: 0,
length: 8,
Expand Down Expand Up @@ -1167,14 +1172,14 @@ mod tests {
// 2: {"struct": [ [204, 205, 206], [207, 208, 209, 210] ]}
// 3: {"struct": [ [], [211, 212, 213, 214, 215] ]}
let expected_levels = LevelInfo {
definition: vec![1, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4],
definition: vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5],
repetition: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]),
array_mask: vec![
false, true, true, true, false, true, true, true, true, true, true, true,
true, true, true, true, true, true,
],
array_offsets,
max_definition: 4,
max_definition: 5,
level_type: LevelType::List(true),
offset: 0,
length: 16,
Expand Down Expand Up @@ -1416,11 +1421,11 @@ mod tests {
let list_level = levels.get(0).unwrap();

let expected_level = LevelInfo {
definition: vec![1, 1, 1, 1, 1],
definition: vec![0, 0, 0, 0, 0],
repetition: None,
array_offsets: vec![0, 1, 2, 3, 4, 5],
array_mask: vec![true, true, true, true, true],
max_definition: 1,
max_definition: 0,
level_type: LevelType::Primitive(false),
offset: 0,
length: 5,
Expand Down

0 comments on commit 4fecc32

Please sign in to comment.