Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for nested list arrays from parquet to arrow arrays (#993) #1588

Merged
merged 11 commits into from
May 9, 2022
Merged
14 changes: 4 additions & 10 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,13 +717,7 @@ impl ArrayReader for StructArrayReader {
.children
.iter_mut()
.map(|reader| reader.next_batch(batch_size))
.try_fold(
Vec::new(),
|mut result, child_array| -> Result<Vec<ArrayRef>> {
result.push(child_array?);
Ok(result)
},
)?;
.collect::<Result<Vec<_>>>()?;

// check that array child data has same size
let children_array_len =
Expand Down Expand Up @@ -1538,15 +1532,15 @@ mod tests {
ArrowType::Int32,
array_1.clone(),
Some(vec![0, 1, 2, 3, 1]),
Some(vec![1, 1, 1, 1, 1]),
Some(vec![0, 1, 1, 1, 1]),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by fix, this set of levels is impossible, as the first rep level must be 0. Nothing cares as this test doesn't actually decode the implied list, but 🤷

);

let array_2 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![5, 4, 3, 2, 1]));
let array_reader_2 = InMemoryArrayReader::new(
ArrowType::Int32,
array_2.clone(),
Some(vec![0, 1, 3, 1, 2]),
Some(vec![1, 1, 1, 1, 1]),
Some(vec![0, 1, 1, 1, 1]),
);

let struct_type = ArrowType::Struct(vec![
Expand Down Expand Up @@ -1576,7 +1570,7 @@ mod tests {
struct_array_reader.get_def_levels()
);
assert_eq!(
Some(vec![1, 1, 1, 1, 1].as_slice()),
Some(vec![0, 1, 1, 1, 1].as_slice()),
struct_array_reader.get_rep_levels()
);
}
Expand Down
188 changes: 89 additions & 99 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::data_type::{
Int96Type,
};
use crate::errors::ParquetError::ArrowError;
use crate::errors::{Result};
use crate::errors::{ParquetError, Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
use crate::schema::visitor::TypeVisitor;

Expand Down Expand Up @@ -129,9 +129,10 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext

let null_mask_only = match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
false
return Err(ArrowError(format!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just moves the error to earlier, there is no point continuing if this is not supported anyway 😁

"Reading repeated primitive ({:?}) is not supported yet!",
cur_type.name()
)));
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
Expand All @@ -143,19 +144,12 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
};

let reader = self.build_for_primitive_type_inner(
cur_type.clone(),
cur_type,
&new_context,
null_mask_only,
)?;

if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
Err(ArrowError(format!(
"Reading repeated field ({:?}) is not supported yet!",
cur_type.name()
)))
} else {
Ok(Some(reader))
}
} else {
Ok(None)
}
Expand All @@ -173,30 +167,19 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
if cur_type.get_basic_info().has_repetition() {
match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
return Err(ArrowError(format!(
"Reading repeated struct ({:?}) is not supported yet!",
cur_type.name(),
)))
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
_ => (),
Repetition::REQUIRED => {}
}
}

if let Some(reader) = self.build_for_struct_type_inner(&cur_type, &new_context)? {
if cur_type.get_basic_info().has_repetition()
&& cur_type.get_basic_info().repetition() == Repetition::REPEATED
{
Err(ArrowError(format!(
"Reading repeated field ({:?}) is not supported yet!",
cur_type.name(),
)))
} else {
Ok(Some(reader))
}
} else {
Ok(None)
}
self.build_for_struct_type_inner(&cur_type, &new_context)
}

/// Build array reader for map type.
Expand All @@ -208,42 +191,61 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
// Add map type to context
let mut new_context = context.clone();
new_context.path.append(vec![map_type.name().to_string()]);
if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() {

match map_type.get_basic_info().repetition() {
Repetition::REQUIRED => {}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
Repetition::REPEATED => {
return Err(ArrowError("Map cannot be repeated".to_string()))
}
}

if map_type.get_fields().len() != 1 {
return Err(ArrowError(format!(
"Map field must have exactly one key_value child, found {}",
map_type.get_fields().len()
)));
}

// Add map entry (key_value) to context
let map_key_value = map_type.get_fields().first().ok_or_else(|| {
ArrowError("Map field must have a key_value entry".to_string())
})?;
let map_key_value = &map_type.get_fields()[0];
if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
return Err(ArrowError(
"Child of map field must be repeated".to_string(),
));
}

new_context
.path
.append(vec![map_key_value.name().to_string()]);

new_context.rep_level += 1;
new_context.def_level += 1;

if map_key_value.get_fields().len() != 2 {
// According to the specification the values are optional (#1642)
return Err(ArrowError(format!(
"Child of map field must have two children, found {}",
map_key_value.get_fields().len()
)));
}

// Get key and value, and create context for each
let map_key = map_key_value
.get_fields()
.first()
.ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?;
let map_value = map_key_value
.get_fields()
.get(1)
.ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?;

let key_reader = {
let mut key_context = new_context.clone();
key_context.def_level += 1;
key_context.path.append(vec![map_key.name().to_string()]);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a drive-by fix, the context is the context of the parent, not the value being dispatched. This would result in map_key appearing twice

self.dispatch(map_key.clone(), &key_context)?.unwrap()
};
let value_reader = {
let mut value_context = new_context.clone();
if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() {
value_context.def_level += 1;
let map_key = &map_key_value.get_fields()[0];
let map_value = &map_key_value.get_fields()[1];

if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
return Err(ArrowError("Map keys must be required".to_string()));
}
self.dispatch(map_value.clone(), &value_context)?.unwrap()
};

if map_value.get_basic_info().repetition() == Repetition::REPEATED {
return Err(ArrowError("Map values cannot be repeated".to_string()));
}

let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();

let arrow_type = self
.arrow_schema
Expand Down Expand Up @@ -290,55 +292,48 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
item_type: Arc<Type>,
context: &'a ArrayReaderBuilderContext,
) -> Result<Option<Box<dyn ArrayReader>>> {
let mut list_child = &list_type
.get_fields()
.first()
.ok_or_else(|| ArrowError("List field must have a child.".to_string()))?
.clone();
let mut new_context = context.clone();

new_context.path.append(vec![list_type.name().to_string()]);
// We need to know at what definition a list or its child is null
let list_null_def = new_context.def_level;
let mut list_empty_def = new_context.def_level;

// If the list's root is nullable
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
// If the list is nullable
let nullable = match list_type.get_basic_info().repetition() {
Repetition::REQUIRED => false,
Repetition::OPTIONAL => {
new_context.def_level += 1;
// current level is nullable, increment to get level for empty list slot
list_empty_def += 1;
true
}

match list_child.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
return Err(general_err!("List type cannot be repeated"))
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
};

if list_type.get_fields().len() != 1 {
return Err(ArrowError(format!(
"List field must have exactly one child, found {}",
list_type.get_fields().len()
)));
}
_ => (),
let mut list_child = &list_type.get_fields()[0];

if list_child.get_basic_info().repetition() != Repetition::REPEATED {
return Err(ArrowError("List child must be repeated".to_string()));
}

let reader = self.dispatch(item_type.clone(), &new_context);
if let Ok(Some(item_reader)) = reader {
let item_reader_type = item_reader.get_data_type().clone();
// The repeated field
new_context.rep_level += 1;
new_context.def_level += 1;

match self.dispatch(item_type, &new_context) {
Ok(Some(item_reader)) => {
let item_type = item_reader.get_data_type().clone();

match item_reader_type {
ArrowType::List(_)
| ArrowType::FixedSizeList(_, _)
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
"reading List({:?}) into arrow not supported yet",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the error that would previously be returned on a nested list

item_type
))),
_ => {
// a list is a group type with a single child. The list child's
// name comes from the child's field name.
// if the child's name is "list" and it has a child, then use this child
if list_child.name() == "list" && !list_child.get_fields().is_empty()
{
if list_child.name() == "list" && !list_child.get_fields().is_empty() {
list_child = list_child.get_fields().first().unwrap();
}

let arrow_type = self
.arrow_schema
.field_with_name(list_type.name())
Expand All @@ -347,7 +342,7 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
.unwrap_or_else(|| {
ArrowType::List(Box::new(Field::new(
list_child.name(),
item_reader_type.clone(),
item_type.clone(),
list_child.is_optional(),
)))
});
Expand All @@ -356,22 +351,19 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
item_reader,
arrow_type,
item_reader_type,
item_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
nullable,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_reader_type,
item_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
nullable,
)),

_ => {
return Err(ArrowError(format!(
"creating ListArrayReader with type {:?} should be unreachable",
Expand All @@ -382,9 +374,7 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext

Ok(Some(list_array_reader))
}
}
} else {
reader
result => result,
}
}
}
Expand Down Expand Up @@ -637,10 +627,10 @@ impl<'a> ArrayReaderBuilder {
let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());

for child in cur_type.get_fields() {
let mut struct_context = context.clone();
if let Some(child_reader) = self.dispatch(child.clone(), context)? {
// TODO: this results in calling get_arrow_field twice, it could be reused
// from child_reader above, by making child_reader carry its `Field`
let mut struct_context = context.clone();
struct_context.path.append(vec![child.name().to_string()]);
let field = match self.get_arrow_field(child, &struct_context) {
Some(f) => f.clone(),
Expand Down
Loading