-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix StructArrayReader handling nested lists (#1651) #1700
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,19 +16,17 @@ | |
// under the License. | ||
|
||
use std::any::Any; | ||
use std::cmp::{max, min}; | ||
use std::cmp::max; | ||
use std::marker::PhantomData; | ||
use std::mem::size_of; | ||
use std::result::Result::Ok; | ||
use std::sync::Arc; | ||
use std::vec::Vec; | ||
|
||
use arrow::array::{ | ||
Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, | ||
DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray, | ||
StructArray, | ||
DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray, | ||
}; | ||
use arrow::buffer::{Buffer, MutableBuffer}; | ||
use arrow::buffer::Buffer; | ||
use arrow::datatypes::{ | ||
ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType, | ||
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, | ||
|
@@ -655,8 +653,7 @@ pub struct StructArrayReader { | |
data_type: ArrowType, | ||
struct_def_level: i16, | ||
struct_rep_level: i16, | ||
def_level_buffer: Option<Buffer>, | ||
rep_level_buffer: Option<Buffer>, | ||
nullable: bool, | ||
} | ||
|
||
impl StructArrayReader { | ||
|
@@ -666,14 +663,14 @@ impl StructArrayReader { | |
children: Vec<Box<dyn ArrayReader>>, | ||
def_level: i16, | ||
rep_level: i16, | ||
nullable: bool, | ||
) -> Self { | ||
Self { | ||
data_type, | ||
children, | ||
struct_def_level: def_level, | ||
struct_rep_level: rep_level, | ||
def_level_buffer: None, | ||
rep_level_buffer: None, | ||
nullable, | ||
} | ||
} | ||
} | ||
|
@@ -708,8 +705,6 @@ impl ArrayReader for StructArrayReader { | |
/// ``` | ||
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { | ||
if self.children.is_empty() { | ||
self.def_level_buffer = None; | ||
self.rep_level_buffer = None; | ||
return Ok(Arc::new(StructArray::from(Vec::new()))); | ||
} | ||
|
||
|
@@ -742,80 +737,59 @@ impl ArrayReader for StructArrayReader { | |
.collect::<Vec<ArrayData>>(), | ||
); | ||
|
||
if self.struct_def_level != 0 { | ||
if self.nullable { | ||
// calculate struct def level data | ||
let buffer_size = children_array_len * size_of::<i16>(); | ||
let mut def_level_data_buffer = MutableBuffer::new(buffer_size); | ||
def_level_data_buffer.resize(buffer_size, 0); | ||
|
||
// Safety: the buffer is always treated as `u16` in the code below | ||
let def_level_data = unsafe { def_level_data_buffer.typed_data_mut() }; | ||
// children should have consistent view of parent, only need to inspect first child | ||
let def_levels = self.children[0] | ||
.get_def_levels() | ||
.expect("child with nullable parents must have definition level"); | ||
|
||
def_level_data | ||
.iter_mut() | ||
.for_each(|v| *v = self.struct_def_level); | ||
// calculate bitmap for current array | ||
let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); | ||
|
||
for child in &self.children { | ||
if let Some(current_child_def_levels) = child.get_def_levels() { | ||
if current_child_def_levels.len() != children_array_len { | ||
return Err(general_err!("Child array length are not equal!")); | ||
} else { | ||
for i in 0..children_array_len { | ||
def_level_data[i] = | ||
min(def_level_data[i], current_child_def_levels[i]); | ||
match self.children[0].get_rep_levels() { | ||
Some(rep_levels) => { | ||
// Sanity check | ||
assert_eq!(rep_levels.len(), def_levels.len()); | ||
|
||
for (rep_level, def_level) in rep_levels.iter().zip(def_levels) { | ||
if rep_level > &self.struct_rep_level { | ||
// Already handled by inner list - SKIP | ||
continue; | ||
} | ||
bitmap_builder.append(*def_level >= self.struct_def_level) | ||
} | ||
} | ||
None => { | ||
for def_level in def_levels { | ||
bitmap_builder.append(*def_level >= self.struct_def_level) | ||
} | ||
} | ||
} | ||
|
||
// calculate bitmap for current array | ||
let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); | ||
for def_level in def_level_data { | ||
let not_null = *def_level >= self.struct_def_level; | ||
bitmap_builder.append(not_null); | ||
if bitmap_builder.len() != children_array_len { | ||
return Err(general_err!("Failed to decode level data for struct array")); | ||
} | ||
|
||
array_data_builder = | ||
array_data_builder.null_bit_buffer(bitmap_builder.finish()); | ||
|
||
self.def_level_buffer = Some(def_level_data_buffer.into()); | ||
} | ||
|
||
let array_data = unsafe { array_data_builder.build_unchecked() }; | ||
|
||
if self.struct_rep_level != 0 { | ||
// calculate struct rep level data, since struct doesn't add to repetition | ||
// levels, here we just need to keep repetition levels of first array | ||
// TODO: Verify that all children array reader has same repetition levels | ||
let rep_level_data = self | ||
.children | ||
.first() | ||
.ok_or_else(|| { | ||
general_err!("Struct array reader should have at least one child!") | ||
})? | ||
.get_rep_levels() | ||
.map(|data| -> Result<Buffer> { | ||
let mut buffer = Int16BufferBuilder::new(children_array_len); | ||
buffer.append_slice(data); | ||
Ok(buffer.finish()) | ||
}) | ||
.transpose()?; | ||
|
||
self.rep_level_buffer = rep_level_data; | ||
} | ||
Ok(Arc::new(StructArray::from(array_data))) | ||
} | ||
|
||
fn get_def_levels(&self) -> Option<&[i16]> { | ||
self.def_level_buffer | ||
.as_ref() | ||
.map(|buf| unsafe { buf.typed_data() }) | ||
// Children definition levels should describe the same | ||
// parent structure, so return first child's | ||
self.children.first().and_then(|l| l.get_def_levels()) | ||
} | ||
|
||
fn get_rep_levels(&self) -> Option<&[i16]> { | ||
self.rep_level_buffer | ||
.as_ref() | ||
.map(|buf| unsafe { buf.typed_data() }) | ||
// Children definition levels should describe the same | ||
// parent structure, so return first child's | ||
self.children.first().and_then(|l| l.get_rep_levels()) | ||
} | ||
} | ||
|
||
|
@@ -828,7 +802,9 @@ mod tests { | |
use rand::{thread_rng, Rng}; | ||
|
||
use crate::arrow::array_reader::test_util::InMemoryArrayReader; | ||
use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray}; | ||
use arrow::array::{ | ||
Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray, | ||
}; | ||
use arrow::datatypes::{ | ||
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, | ||
Int32Type as ArrowInt32, Int64Type as ArrowInt64, | ||
|
@@ -1551,6 +1527,7 @@ mod tests { | |
vec![Box::new(array_reader_1), Box::new(array_reader_2)], | ||
1, | ||
1, | ||
true, | ||
); | ||
|
||
let struct_array = struct_array_reader.next_batch(5).unwrap(); | ||
|
@@ -1564,12 +1541,74 @@ mod tests { | |
.collect::<Vec<bool>>() | ||
); | ||
assert_eq!( | ||
Some(vec![0, 1, 1, 1, 1].as_slice()), | ||
Some(vec![0, 1, 2, 3, 1].as_slice()), | ||
struct_array_reader.get_def_levels() | ||
); | ||
assert_eq!( | ||
Some(vec![0, 1, 1, 1, 1].as_slice()), | ||
struct_array_reader.get_rep_levels() | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_struct_array_reader_list() { | ||
use arrow::datatypes::Int32Type; | ||
// [ | ||
// {foo: [1, 2, null], | ||
// {foo: []}, | ||
// {foo: null}, | ||
// null, | ||
// ] | ||
|
||
let expected_l = | ||
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![ | ||
Some(vec![Some(1), Some(2), None]), | ||
Some(vec![]), | ||
None, | ||
None, | ||
])); | ||
|
||
let nulls = Buffer::from([0b00000111]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't this set the first three elements in the struct array to That doesn't seem consistent with the structure created in the comments above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will rename to validity, it is an arrow null mask... I agree the naming is perpetually confusing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aaah! |
||
let struct_fields = vec![( | ||
Field::new("foo", expected_l.data_type().clone(), true), | ||
expected_l.clone() as ArrayRef, | ||
)]; | ||
let expected = StructArray::from((struct_fields, nulls)); | ||
|
||
let array = Arc::new(Int32Array::from_iter(vec![ | ||
Some(1), | ||
Some(2), | ||
None, | ||
None, | ||
None, | ||
None, | ||
])); | ||
let reader = InMemoryArrayReader::new( | ||
ArrowType::Int32, | ||
array, | ||
Some(vec![4, 4, 3, 2, 1, 0]), | ||
Some(vec![0, 1, 1, 0, 0, 0]), | ||
); | ||
|
||
let list_reader = ListArrayReader::<i32>::new( | ||
Box::new(reader), | ||
expected_l.data_type().clone(), | ||
ArrowType::Int32, | ||
3, | ||
1, | ||
true, | ||
); | ||
|
||
let mut struct_reader = StructArrayReader::new( | ||
expected.data_type().clone(), | ||
vec![Box::new(list_reader)], | ||
1, | ||
0, | ||
true, | ||
); | ||
|
||
let actual = struct_reader.next_batch(1024).unwrap(); | ||
let actual = actual.as_any().downcast_ref::<StructArray>().unwrap(); | ||
assert_eq!(actual, &expected) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
use crate::arrow::array_reader::ArrayReader; | ||
use crate::errors::ParquetError::ArrowError; | ||
use crate::errors::{Result, ParquetError}; | ||
use crate::errors::{ParquetError, Result}; | ||
use arrow::array::{ArrayDataBuilder, ArrayRef, MapArray}; | ||
use arrow::buffer::{Buffer, MutableBuffer}; | ||
use arrow::datatypes::DataType as ArrowType; | ||
|
@@ -33,8 +33,6 @@ pub struct MapArrayReader { | |
data_type: ArrowType, | ||
map_def_level: i16, | ||
map_rep_level: i16, | ||
def_level_buffer: Option<Buffer>, | ||
rep_level_buffer: Option<Buffer>, | ||
} | ||
|
||
impl MapArrayReader { | ||
|
@@ -51,8 +49,6 @@ impl MapArrayReader { | |
data_type, | ||
map_def_level: rep_level, | ||
map_rep_level: def_level, | ||
def_level_buffer: None, | ||
rep_level_buffer: None, | ||
} | ||
} | ||
} | ||
|
@@ -154,15 +150,15 @@ impl ArrayReader for MapArrayReader { | |
} | ||
|
||
fn get_def_levels(&self) -> Option<&[i16]> { | ||
self.def_level_buffer | ||
.as_ref() | ||
.map(|buf| unsafe { buf.typed_data() }) | ||
// Children definition levels should describe the same parent structure, | ||
// so return key_reader only | ||
self.key_reader.get_def_levels() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Drive by fix, part of #1699 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a big fan of less |
||
} | ||
|
||
fn get_rep_levels(&self) -> Option<&[i16]> { | ||
self.rep_level_buffer | ||
.as_ref() | ||
.map(|buf| unsafe { buf.typed_data() }) | ||
// Children repetition levels should describe the same parent structure, | ||
// so return key_reader only | ||
self.key_reader.get_rep_levels() | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for removing some
unsafe