Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for nested list arrays from parquet to arrow arrays (#993) #1588

Merged
merged 11 commits into from
May 9, 2022
Merged
8 changes: 1 addition & 7 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,13 +717,7 @@ impl ArrayReader for StructArrayReader {
.children
.iter_mut()
.map(|reader| reader.next_batch(batch_size))
.try_fold(
Vec::new(),
|mut result, child_array| -> Result<Vec<ArrayRef>> {
result.push(child_array?);
Ok(result)
},
)?;
.collect::<Result<Vec<_>>>()?;

// check that array child data has same size
let children_array_len =
Expand Down
260 changes: 123 additions & 137 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::data_type::{
Int96Type,
};
use crate::errors::ParquetError::ArrowError;
use crate::errors::{Result};
use crate::errors::{ParquetError, Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
use crate::schema::visitor::TypeVisitor;

Expand Down Expand Up @@ -129,9 +129,10 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext

let null_mask_only = match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
false
return Err(ArrowError(format!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just moves the error to earlier, there is no point continuing if this is not supported anyway 😁

"Reading repeated primitive ({:?}) is not supported yet!",
cur_type.name()
)));
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
Expand All @@ -143,19 +144,12 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
};

let reader = self.build_for_primitive_type_inner(
cur_type.clone(),
cur_type,
&new_context,
null_mask_only,
)?;

if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
Err(ArrowError(format!(
"Reading repeated field ({:?}) is not supported yet!",
cur_type.name()
)))
} else {
Ok(Some(reader))
}
Ok(Some(reader))
} else {
Ok(None)
}
Expand All @@ -173,30 +167,19 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
if cur_type.get_basic_info().has_repetition() {
match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
return Err(ArrowError(format!(
"Reading repeated struct ({:?}) is not supported yet!",
cur_type.name(),
)))
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
_ => (),
Repetition::REQUIRED => {}
}
}

if let Some(reader) = self.build_for_struct_type_inner(&cur_type, &new_context)? {
if cur_type.get_basic_info().has_repetition()
&& cur_type.get_basic_info().repetition() == Repetition::REPEATED
{
Err(ArrowError(format!(
"Reading repeated field ({:?}) is not supported yet!",
cur_type.name(),
)))
} else {
Ok(Some(reader))
}
} else {
Ok(None)
}
self.build_for_struct_type_inner(&cur_type, &new_context)
}

/// Build array reader for map type.
Expand All @@ -208,42 +191,61 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
// Add map type to context
let mut new_context = context.clone();
new_context.path.append(vec![map_type.name().to_string()]);
if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() {
new_context.def_level += 1;

match map_type.get_basic_info().repetition() {
Repetition::REQUIRED => {}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
Repetition::REPEATED => {
return Err(ArrowError("Map cannot be repeated".to_string()))
}
}

if map_type.get_fields().len() != 1 {
return Err(ArrowError(format!(
"Map field must have exactly one key_value child, found {}",
map_type.get_fields().len()
)));
}

// Add map entry (key_value) to context
let map_key_value = map_type.get_fields().first().ok_or_else(|| {
ArrowError("Map field must have a key_value entry".to_string())
})?;
let map_key_value = &map_type.get_fields()[0];
if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
return Err(ArrowError(
"Child of map field must be repeated".to_string(),
));
}

new_context
.path
.append(vec![map_key_value.name().to_string()]);

new_context.rep_level += 1;
new_context.def_level += 1;

if map_key_value.get_fields().len() != 2 {
// According to the specification the values are optional (#1642)
return Err(ArrowError(format!(
"Child of map field must have two children, found {}",
map_key_value.get_fields().len()
)));
}

// Get key and value, and create context for each
let map_key = map_key_value
.get_fields()
.first()
.ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?;
let map_value = map_key_value
.get_fields()
.get(1)
.ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?;

let key_reader = {
let mut key_context = new_context.clone();
key_context.def_level += 1;
key_context.path.append(vec![map_key.name().to_string()]);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a drive-by fix, the context is the context of the parent, not the value being dispatched. This would result in map_key appearing twice

self.dispatch(map_key.clone(), &key_context)?.unwrap()
};
let value_reader = {
let mut value_context = new_context.clone();
if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() {
value_context.def_level += 1;
}
self.dispatch(map_value.clone(), &value_context)?.unwrap()
};
let map_key = &map_key_value.get_fields()[0];
let map_value = &map_key_value.get_fields()[1];

if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
return Err(ArrowError("Map keys must be required".to_string()));
}

if map_value.get_basic_info().repetition() == Repetition::REPEATED {
return Err(ArrowError("Map values cannot be repeated".to_string()));
}

let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();

let arrow_type = self
.arrow_schema
Expand Down Expand Up @@ -295,96 +297,80 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
.first()
.ok_or_else(|| ArrowError("List field must have a child.".to_string()))?
.clone();
let mut new_context = context.clone();

// If the list can contain nulls
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend reading this with whitespace disabled https://github.com/apache/arrow-rs/pull/1588/files?w=1

let nullable = match list_type.get_basic_info().repetition() {
Repetition::REQUIRED => false,
Repetition::OPTIONAL => true,
Repetition::REPEATED => {
return Err(general_err!("List type cannot be repeated"))
}
};

let mut new_context = context.clone();
new_context.path.append(vec![list_type.name().to_string()]);
// We need to know at what definition a list or its child is null
let list_null_def = new_context.def_level;
let mut list_empty_def = new_context.def_level;

// The repeated field
new_context.rep_level += 1;
new_context.def_level += 1;

// If the list's root is nullable
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
if nullable {
new_context.def_level += 1;
// current level is nullable, increment to get level for empty list slot
list_empty_def += 1;
}

match list_child.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
_ => (),
}
match self.dispatch(item_type, &new_context) {
Ok(Some(item_reader)) => {
let item_type = item_reader.get_data_type().clone();

// a list is a group type with a single child. The list child's
// name comes from the child's field name.
// if the child's name is "list" and it has a child, then use this child
if list_child.name() == "list" && !list_child.get_fields().is_empty() {
list_child = list_child.get_fields().first().unwrap();
}

let reader = self.dispatch(item_type.clone(), &new_context);
if let Ok(Some(item_reader)) = reader {
let item_reader_type = item_reader.get_data_type().clone();

match item_reader_type {
ArrowType::List(_)
| ArrowType::FixedSizeList(_, _)
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
"reading List({:?}) into arrow not supported yet",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the error that would previously be returned on a nested list

item_type
))),
_ => {
// a list is a group type with a single child. The list child's
// name comes from the child's field name.
// if the child's name is "list" and it has a child, then use this child
if list_child.name() == "list" && !list_child.get_fields().is_empty()
{
list_child = list_child.get_fields().first().unwrap();
let arrow_type = self
.arrow_schema
.field_with_name(list_type.name())
.ok()
.map(|f| f.data_type().to_owned())
.unwrap_or_else(|| {
ArrowType::List(Box::new(Field::new(
list_child.name(),
item_type.clone(),
list_child.is_optional(),
)))
});

let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
item_reader,
arrow_type,
item_type,
new_context.def_level,
new_context.rep_level,
nullable,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_type,
new_context.def_level,
new_context.rep_level,
nullable,
)),
_ => {
return Err(ArrowError(format!(
"creating ListArrayReader with type {:?} should be unreachable",
arrow_type
)))
}
let arrow_type = self
.arrow_schema
.field_with_name(list_type.name())
.ok()
.map(|f| f.data_type().to_owned())
.unwrap_or_else(|| {
ArrowType::List(Box::new(Field::new(
list_child.name(),
item_reader_type.clone(),
list_child.is_optional(),
)))
});

let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
item_reader,
arrow_type,
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),

_ => {
return Err(ArrowError(format!(
"creating ListArrayReader with type {:?} should be unreachable",
arrow_type
)))
}
};
};

Ok(Some(list_array_reader))
}
Ok(Some(list_array_reader))
}
} else {
reader
result => result,
}
}
}
Expand Down Expand Up @@ -637,10 +623,10 @@ impl<'a> ArrayReaderBuilder {
let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());

for child in cur_type.get_fields() {
let mut struct_context = context.clone();
if let Some(child_reader) = self.dispatch(child.clone(), context)? {
// TODO: this results in calling get_arrow_field twice, it could be reused
// from child_reader above, by making child_reader carry its `Field`
let mut struct_context = context.clone();
struct_context.path.append(vec![child.name().to_string()]);
let field = match self.get_arrow_field(child, &struct_context) {
Some(f) => f.clone(),
Expand Down
Loading