-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for nested list arrays from parquet to arrow arrays (#993) #1588
Changes from 2 commits
d3537c8
bf32cb6
70c16c3
e9acb22
b3ca78e
1d00e7c
fdae1d5
1b99c49
05c2311
f250ba7
15b8144
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ use crate::data_type::{ | |
Int96Type, | ||
}; | ||
use crate::errors::ParquetError::ArrowError; | ||
use crate::errors::{Result}; | ||
use crate::errors::{ParquetError, Result}; | ||
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr}; | ||
use crate::schema::visitor::TypeVisitor; | ||
|
||
|
@@ -129,9 +129,10 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext | |
|
||
let null_mask_only = match cur_type.get_basic_info().repetition() { | ||
Repetition::REPEATED => { | ||
new_context.def_level += 1; | ||
new_context.rep_level += 1; | ||
false | ||
return Err(ArrowError(format!( | ||
"Reading repeated primitive ({:?}) is not supported yet!", | ||
cur_type.name() | ||
))); | ||
} | ||
Repetition::OPTIONAL => { | ||
new_context.def_level += 1; | ||
|
@@ -143,19 +144,12 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext | |
}; | ||
|
||
let reader = self.build_for_primitive_type_inner( | ||
cur_type.clone(), | ||
cur_type, | ||
&new_context, | ||
null_mask_only, | ||
)?; | ||
|
||
if cur_type.get_basic_info().repetition() == Repetition::REPEATED { | ||
Err(ArrowError(format!( | ||
"Reading repeated field ({:?}) is not supported yet!", | ||
cur_type.name() | ||
))) | ||
} else { | ||
Ok(Some(reader)) | ||
} | ||
Ok(Some(reader)) | ||
} else { | ||
Ok(None) | ||
} | ||
|
@@ -173,30 +167,19 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext | |
if cur_type.get_basic_info().has_repetition() { | ||
match cur_type.get_basic_info().repetition() { | ||
Repetition::REPEATED => { | ||
new_context.def_level += 1; | ||
new_context.rep_level += 1; | ||
return Err(ArrowError(format!( | ||
"Reading repeated struct ({:?}) is not supported yet!", | ||
cur_type.name(), | ||
))) | ||
} | ||
Repetition::OPTIONAL => { | ||
new_context.def_level += 1; | ||
} | ||
_ => (), | ||
Repetition::REQUIRED => {} | ||
} | ||
} | ||
|
||
if let Some(reader) = self.build_for_struct_type_inner(&cur_type, &new_context)? { | ||
if cur_type.get_basic_info().has_repetition() | ||
&& cur_type.get_basic_info().repetition() == Repetition::REPEATED | ||
{ | ||
Err(ArrowError(format!( | ||
"Reading repeated field ({:?}) is not supported yet!", | ||
cur_type.name(), | ||
))) | ||
} else { | ||
Ok(Some(reader)) | ||
} | ||
} else { | ||
Ok(None) | ||
} | ||
self.build_for_struct_type_inner(&cur_type, &new_context) | ||
} | ||
|
||
/// Build array reader for map type. | ||
|
@@ -208,42 +191,61 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext | |
// Add map type to context | ||
let mut new_context = context.clone(); | ||
new_context.path.append(vec![map_type.name().to_string()]); | ||
if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() { | ||
new_context.def_level += 1; | ||
|
||
match map_type.get_basic_info().repetition() { | ||
Repetition::REQUIRED => {} | ||
Repetition::OPTIONAL => { | ||
new_context.def_level += 1; | ||
} | ||
Repetition::REPEATED => { | ||
return Err(ArrowError("Map cannot be repeated".to_string())) | ||
} | ||
} | ||
|
||
if map_type.get_fields().len() != 1 { | ||
return Err(ArrowError(format!( | ||
"Map field must have exactly one key_value child, found {}", | ||
map_type.get_fields().len() | ||
))); | ||
} | ||
|
||
// Add map entry (key_value) to context | ||
let map_key_value = map_type.get_fields().first().ok_or_else(|| { | ||
ArrowError("Map field must have a key_value entry".to_string()) | ||
})?; | ||
let map_key_value = &map_type.get_fields()[0]; | ||
if map_key_value.get_basic_info().repetition() != Repetition::REPEATED { | ||
return Err(ArrowError( | ||
"Child of map field must be repeated".to_string(), | ||
)); | ||
} | ||
|
||
new_context | ||
.path | ||
.append(vec![map_key_value.name().to_string()]); | ||
|
||
new_context.rep_level += 1; | ||
new_context.def_level += 1; | ||
|
||
if map_key_value.get_fields().len() != 2 { | ||
// According to the specification the values are optional (#1642) | ||
return Err(ArrowError(format!( | ||
"Child of map field must have two children, found {}", | ||
map_key_value.get_fields().len() | ||
))); | ||
} | ||
|
||
// Get key and value, and create context for each | ||
let map_key = map_key_value | ||
.get_fields() | ||
.first() | ||
.ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?; | ||
let map_value = map_key_value | ||
.get_fields() | ||
.get(1) | ||
.ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?; | ||
|
||
let key_reader = { | ||
let mut key_context = new_context.clone(); | ||
key_context.def_level += 1; | ||
key_context.path.append(vec![map_key.name().to_string()]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a drive-by fix, the context is the context of the parent, not the value being dispatched. This would result in map_key appearing twice |
||
self.dispatch(map_key.clone(), &key_context)?.unwrap() | ||
}; | ||
let value_reader = { | ||
let mut value_context = new_context.clone(); | ||
if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() { | ||
value_context.def_level += 1; | ||
} | ||
self.dispatch(map_value.clone(), &value_context)?.unwrap() | ||
}; | ||
let map_key = &map_key_value.get_fields()[0]; | ||
let map_value = &map_key_value.get_fields()[1]; | ||
|
||
if map_key.get_basic_info().repetition() != Repetition::REQUIRED { | ||
return Err(ArrowError("Map keys must be required".to_string())); | ||
} | ||
|
||
if map_value.get_basic_info().repetition() == Repetition::REPEATED { | ||
return Err(ArrowError("Map values cannot be repeated".to_string())); | ||
} | ||
|
||
let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap(); | ||
let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap(); | ||
|
||
let arrow_type = self | ||
.arrow_schema | ||
|
@@ -295,96 +297,80 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext | |
.first() | ||
.ok_or_else(|| ArrowError("List field must have a child.".to_string()))? | ||
.clone(); | ||
let mut new_context = context.clone(); | ||
|
||
// If the list can contain nulls | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recommend reading this with whitespace disabled https://github.com/apache/arrow-rs/pull/1588/files?w=1 |
||
let nullable = match list_type.get_basic_info().repetition() { | ||
Repetition::REQUIRED => false, | ||
Repetition::OPTIONAL => true, | ||
Repetition::REPEATED => { | ||
return Err(general_err!("List type cannot be repeated")) | ||
} | ||
}; | ||
|
||
let mut new_context = context.clone(); | ||
new_context.path.append(vec![list_type.name().to_string()]); | ||
// We need to know at what definition a list or its child is null | ||
let list_null_def = new_context.def_level; | ||
let mut list_empty_def = new_context.def_level; | ||
|
||
// The repeated field | ||
new_context.rep_level += 1; | ||
new_context.def_level += 1; | ||
|
||
// If the list's root is nullable | ||
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() { | ||
if nullable { | ||
new_context.def_level += 1; | ||
// current level is nullable, increment to get level for empty list slot | ||
list_empty_def += 1; | ||
} | ||
|
||
match list_child.get_basic_info().repetition() { | ||
Repetition::REPEATED => { | ||
new_context.def_level += 1; | ||
new_context.rep_level += 1; | ||
} | ||
Repetition::OPTIONAL => { | ||
new_context.def_level += 1; | ||
} | ||
_ => (), | ||
} | ||
match self.dispatch(item_type, &new_context) { | ||
Ok(Some(item_reader)) => { | ||
let item_type = item_reader.get_data_type().clone(); | ||
|
||
// a list is a group type with a single child. The list child's | ||
// name comes from the child's field name. | ||
// if the child's name is "list" and it has a child, then use this child | ||
if list_child.name() == "list" && !list_child.get_fields().is_empty() { | ||
list_child = list_child.get_fields().first().unwrap(); | ||
} | ||
|
||
let reader = self.dispatch(item_type.clone(), &new_context); | ||
if let Ok(Some(item_reader)) = reader { | ||
let item_reader_type = item_reader.get_data_type().clone(); | ||
|
||
match item_reader_type { | ||
ArrowType::List(_) | ||
| ArrowType::FixedSizeList(_, _) | ||
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!( | ||
"reading List({:?}) into arrow not supported yet", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the error that would previously be returned on a nested list |
||
item_type | ||
))), | ||
_ => { | ||
// a list is a group type with a single child. The list child's | ||
// name comes from the child's field name. | ||
// if the child's name is "list" and it has a child, then use this child | ||
if list_child.name() == "list" && !list_child.get_fields().is_empty() | ||
{ | ||
list_child = list_child.get_fields().first().unwrap(); | ||
let arrow_type = self | ||
.arrow_schema | ||
.field_with_name(list_type.name()) | ||
.ok() | ||
.map(|f| f.data_type().to_owned()) | ||
.unwrap_or_else(|| { | ||
ArrowType::List(Box::new(Field::new( | ||
list_child.name(), | ||
item_type.clone(), | ||
list_child.is_optional(), | ||
))) | ||
}); | ||
|
||
let list_array_reader: Box<dyn ArrayReader> = match arrow_type { | ||
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new( | ||
item_reader, | ||
arrow_type, | ||
item_type, | ||
new_context.def_level, | ||
new_context.rep_level, | ||
nullable, | ||
)), | ||
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new( | ||
item_reader, | ||
arrow_type, | ||
item_type, | ||
new_context.def_level, | ||
new_context.rep_level, | ||
nullable, | ||
)), | ||
_ => { | ||
return Err(ArrowError(format!( | ||
"creating ListArrayReader with type {:?} should be unreachable", | ||
arrow_type | ||
))) | ||
} | ||
let arrow_type = self | ||
.arrow_schema | ||
.field_with_name(list_type.name()) | ||
.ok() | ||
.map(|f| f.data_type().to_owned()) | ||
.unwrap_or_else(|| { | ||
ArrowType::List(Box::new(Field::new( | ||
list_child.name(), | ||
item_reader_type.clone(), | ||
list_child.is_optional(), | ||
))) | ||
}); | ||
|
||
let list_array_reader: Box<dyn ArrayReader> = match arrow_type { | ||
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new( | ||
item_reader, | ||
arrow_type, | ||
item_reader_type, | ||
new_context.def_level, | ||
new_context.rep_level, | ||
list_null_def, | ||
list_empty_def, | ||
)), | ||
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new( | ||
item_reader, | ||
arrow_type, | ||
item_reader_type, | ||
new_context.def_level, | ||
new_context.rep_level, | ||
list_null_def, | ||
list_empty_def, | ||
)), | ||
|
||
_ => { | ||
return Err(ArrowError(format!( | ||
"creating ListArrayReader with type {:?} should be unreachable", | ||
arrow_type | ||
))) | ||
} | ||
}; | ||
}; | ||
|
||
Ok(Some(list_array_reader)) | ||
} | ||
Ok(Some(list_array_reader)) | ||
} | ||
} else { | ||
reader | ||
result => result, | ||
} | ||
} | ||
} | ||
|
@@ -637,10 +623,10 @@ impl<'a> ArrayReaderBuilder { | |
let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); | ||
|
||
for child in cur_type.get_fields() { | ||
let mut struct_context = context.clone(); | ||
if let Some(child_reader) = self.dispatch(child.clone(), context)? { | ||
// TODO: this results in calling get_arrow_field twice, it could be reused | ||
// from child_reader above, by making child_reader carry its `Field` | ||
let mut struct_context = context.clone(); | ||
struct_context.path.append(vec![child.name().to_string()]); | ||
let field = match self.get_arrow_field(child, &struct_context) { | ||
Some(f) => f.clone(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just moves the error to earlier, there is no point continuing if this is not supported anyway 😁