Skip to content

Commit

Permalink
Add support for nested list arrays (#993)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 4, 2022
1 parent 8f24c45 commit d3537c8
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 322 deletions.
8 changes: 1 addition & 7 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,13 +717,7 @@ impl ArrayReader for StructArrayReader {
.children
.iter_mut()
.map(|reader| reader.next_batch(batch_size))
.try_fold(
Vec::new(),
|mut result, child_array| -> Result<Vec<ArrayRef>> {
result.push(child_array?);
Ok(result)
},
)?;
.collect::<Result<Vec<_>>>()?;

// check that array child data has same size
let children_array_len =
Expand Down
260 changes: 123 additions & 137 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::data_type::{
Int96Type,
};
use crate::errors::ParquetError::ArrowError;
use crate::errors::{Result};
use crate::errors::{ParquetError, Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
use crate::schema::visitor::TypeVisitor;

Expand Down Expand Up @@ -129,9 +129,10 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext

let null_mask_only = match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
false
return Err(ArrowError(format!(
"Reading repeated primitive ({:?}) is not supported yet!",
cur_type.name()
)));
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
Expand All @@ -143,19 +144,12 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
};

let reader = self.build_for_primitive_type_inner(
cur_type.clone(),
cur_type,
&new_context,
null_mask_only,
)?;

if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
Err(ArrowError(format!(
"Reading repeated field ({:?}) is not supported yet!",
cur_type.name()
)))
} else {
Ok(Some(reader))
}
Ok(Some(reader))
} else {
Ok(None)
}
Expand All @@ -173,30 +167,19 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
if cur_type.get_basic_info().has_repetition() {
match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
return Err(ArrowError(format!(
"Reading repeated struct ({:?}) is not supported yet!",
cur_type.name(),
)))
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
_ => (),
Repetition::REQUIRED => {}
}
}

if let Some(reader) = self.build_for_struct_type_inner(&cur_type, &new_context)? {
if cur_type.get_basic_info().has_repetition()
&& cur_type.get_basic_info().repetition() == Repetition::REPEATED
{
Err(ArrowError(format!(
"Reading repeated field ({:?}) is not supported yet!",
cur_type.name(),
)))
} else {
Ok(Some(reader))
}
} else {
Ok(None)
}
self.build_for_struct_type_inner(&cur_type, &new_context)
}

/// Build array reader for map type.
Expand All @@ -208,42 +191,61 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
// Add map type to context
let mut new_context = context.clone();
new_context.path.append(vec![map_type.name().to_string()]);
if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() {
new_context.def_level += 1;

match map_type.get_basic_info().repetition() {
Repetition::REQUIRED => {}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
Repetition::REPEATED => {
return Err(ArrowError("Map cannot be repeated".to_string()))
}
}

if map_type.get_fields().len() != 1 {
return Err(ArrowError(format!(
"Map field must have exactly one key_value child, found {}",
map_type.get_fields().len()
)));
}

// Add map entry (key_value) to context
let map_key_value = map_type.get_fields().first().ok_or_else(|| {
ArrowError("Map field must have a key_value entry".to_string())
})?;
let map_key_value = &map_type.get_fields()[0];
if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
return Err(ArrowError(
"Child of map field must be repeated".to_string(),
));
}

new_context
.path
.append(vec![map_key_value.name().to_string()]);

new_context.rep_level += 1;
new_context.def_level += 1;

if map_key_value.get_fields().len() != 2 {
// According to the specification the values are optional (#1642)
return Err(ArrowError(format!(
"Child of map field must have two children, found {}",
map_key_value.get_fields().len()
)));
}

// Get key and value, and create context for each
let map_key = map_key_value
.get_fields()
.first()
.ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?;
let map_value = map_key_value
.get_fields()
.get(1)
.ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?;

let key_reader = {
let mut key_context = new_context.clone();
key_context.def_level += 1;
key_context.path.append(vec![map_key.name().to_string()]);
self.dispatch(map_key.clone(), &key_context)?.unwrap()
};
let value_reader = {
let mut value_context = new_context.clone();
if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() {
value_context.def_level += 1;
}
self.dispatch(map_value.clone(), &value_context)?.unwrap()
};
let map_key = &map_key_value.get_fields()[0];
let map_value = &map_key_value.get_fields()[1];

if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
return Err(ArrowError("Map keys must be required".to_string()));
}

if map_value.get_basic_info().repetition() == Repetition::REPEATED {
return Err(ArrowError("Map values cannot be repeated".to_string()));
}

let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();

let arrow_type = self
.arrow_schema
Expand Down Expand Up @@ -295,96 +297,80 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
.first()
.ok_or_else(|| ArrowError("List field must have a child.".to_string()))?
.clone();
let mut new_context = context.clone();

// If the list can contain nulls
let nullable = match list_type.get_basic_info().repetition() {
Repetition::REQUIRED => false,
Repetition::OPTIONAL => true,
Repetition::REPEATED => {
return Err(general_err!("List type cannot be repeated"))
}
};

let mut new_context = context.clone();
new_context.path.append(vec![list_type.name().to_string()]);
// We need to know at what definition a list or its child is null
let list_null_def = new_context.def_level;
let mut list_empty_def = new_context.def_level;

// The repeated field
new_context.rep_level += 1;
new_context.def_level += 1;

// If the list's root is nullable
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
if nullable {
new_context.def_level += 1;
// current level is nullable, increment to get level for empty list slot
list_empty_def += 1;
}

match list_child.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
_ => (),
}
match self.dispatch(item_type, &new_context) {
Ok(Some(item_reader)) => {
let item_type = item_reader.get_data_type().clone();

// a list is a group type with a single child. The list child's
// name comes from the child's field name.
// if the child's name is "list" and it has a child, then use this child
if list_child.name() == "list" && !list_child.get_fields().is_empty() {
list_child = list_child.get_fields().first().unwrap();
}

let reader = self.dispatch(item_type.clone(), &new_context);
if let Ok(Some(item_reader)) = reader {
let item_reader_type = item_reader.get_data_type().clone();

match item_reader_type {
ArrowType::List(_)
| ArrowType::FixedSizeList(_, _)
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
"reading List({:?}) into arrow not supported yet",
item_type
))),
_ => {
// a list is a group type with a single child. The list child's
// name comes from the child's field name.
// if the child's name is "list" and it has a child, then use this child
if list_child.name() == "list" && !list_child.get_fields().is_empty()
{
list_child = list_child.get_fields().first().unwrap();
let arrow_type = self
.arrow_schema
.field_with_name(list_type.name())
.ok()
.map(|f| f.data_type().to_owned())
.unwrap_or_else(|| {
ArrowType::List(Box::new(Field::new(
list_child.name(),
item_type.clone(),
list_child.is_optional(),
)))
});

let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
item_reader,
arrow_type,
item_type,
new_context.def_level,
new_context.rep_level,
nullable,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_type,
new_context.def_level,
new_context.rep_level,
nullable,
)),
_ => {
return Err(ArrowError(format!(
"creating ListArrayReader with type {:?} should be unreachable",
arrow_type
)))
}
let arrow_type = self
.arrow_schema
.field_with_name(list_type.name())
.ok()
.map(|f| f.data_type().to_owned())
.unwrap_or_else(|| {
ArrowType::List(Box::new(Field::new(
list_child.name(),
item_reader_type.clone(),
list_child.is_optional(),
)))
});

let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
item_reader,
arrow_type,
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),

_ => {
return Err(ArrowError(format!(
"creating ListArrayReader with type {:?} should be unreachable",
arrow_type
)))
}
};
};

Ok(Some(list_array_reader))
}
Ok(Some(list_array_reader))
}
} else {
reader
result => result,
}
}
}
Expand Down Expand Up @@ -637,10 +623,10 @@ impl<'a> ArrayReaderBuilder {
let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());

for child in cur_type.get_fields() {
let mut struct_context = context.clone();
if let Some(child_reader) = self.dispatch(child.clone(), context)? {
// TODO: this results in calling get_arrow_field twice, it could be reused
// from child_reader above, by making child_reader carry its `Field`
let mut struct_context = context.clone();
struct_context.path.append(vec![child.name().to_string()]);
let field = match self.get_arrow_field(child, &struct_context) {
Some(f) => f.clone(),
Expand Down
Loading

0 comments on commit d3537c8

Please sign in to comment.