Skip to content

Commit

Permalink
Add support for nested list arrays (#993)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Apr 18, 2022
1 parent 786792c commit 6ba6299
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 184 deletions.
160 changes: 70 additions & 90 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::data_type::{
Int96Type,
};
use crate::errors::ParquetError::ArrowError;
use crate::errors::{Result};
use crate::errors::{ParquetError, Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
use crate::schema::visitor::TypeVisitor;

Expand Down Expand Up @@ -129,9 +129,10 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext

let null_mask_only = match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
false
return Err(ArrowError(format!(
"Reading repeated field ({:?}) is not supported yet!",
cur_type.name()
)));
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
Expand All @@ -148,14 +149,7 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
null_mask_only,
)?;

if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
Err(ArrowError(format!(
"Reading repeated field ({:?}) is not supported yet!",
cur_type.name()
)))
} else {
Ok(Some(reader))
}
Ok(Some(reader))
} else {
Ok(None)
}
Expand Down Expand Up @@ -295,96 +289,82 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
.first()
.ok_or_else(|| ArrowError("List field must have a child.".to_string()))?
.clone();
let mut new_context = context.clone();

// If the list can contain nulls
let nullable = match list_type.get_basic_info().repetition() {
Repetition::REQUIRED => false,
Repetition::OPTIONAL => true,
Repetition::REPEATED => {
return Err(general_err!("List type cannot be repeated"))
}
};

let mut new_context = context.clone();
new_context.path.append(vec![list_type.name().to_string()]);
// We need to know at what definition a list or its child is null
let list_null_def = new_context.def_level;
let mut list_empty_def = new_context.def_level;
new_context.rep_level += 1;

// If the list's root is nullable
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
if nullable {
new_context.def_level += 1;
// current level is nullable, increment to get level for empty list slot
list_empty_def += 1;
}

match list_child.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
_ => (),
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
new_context.def_level += 1;
}

let reader = self.dispatch(item_type.clone(), &new_context);
if let Ok(Some(item_reader)) = reader {
let item_reader_type = item_reader.get_data_type().clone();

match item_reader_type {
ArrowType::List(_)
| ArrowType::FixedSizeList(_, _)
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
"reading List({:?}) into arrow not supported yet",
item_type
))),
_ => {
// a list is a group type with a single child. The list child's
// name comes from the child's field name.
// if the child's name is "list" and it has a child, then use this child
if list_child.name() == "list" && !list_child.get_fields().is_empty()
{
list_child = list_child.get_fields().first().unwrap();
}
let arrow_type = self
.arrow_schema
.field_with_name(list_type.name())
.ok()
.map(|f| f.data_type().to_owned())
.unwrap_or_else(|| {
ArrowType::List(Box::new(Field::new(
list_child.name(),
item_reader_type.clone(),
list_child.is_optional(),
)))
});

let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
item_reader,
arrow_type,
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),

_ => {
return Err(ArrowError(format!(
"creating ListArrayReader with type {:?} should be unreachable",
arrow_type
)))
}
};
match self.dispatch(item_type, &new_context) {
Ok(Some(item_reader)) => {
let item_type = item_reader.get_data_type().clone();

Ok(Some(list_array_reader))
// a list is a group type with a single child. The list child's
// name comes from the child's field name.
// if the child's name is "list" and it has a child, then use this child
if list_child.name() == "list" && !list_child.get_fields().is_empty() {
list_child = list_child.get_fields().first().unwrap();
}

let arrow_type = self
.arrow_schema
.field_with_name(list_type.name())
.ok()
.map(|f| f.data_type().to_owned())
.unwrap_or_else(|| {
// TODO: Is this fallback correct/necessary?
ArrowType::List(Box::new(Field::new(
list_child.name(),
item_type.clone(),
list_child.is_optional(),
)))
});

let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
item_reader,
arrow_type,
item_type.clone(),
new_context.def_level,
new_context.rep_level,
nullable,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_type.clone(),
new_context.def_level,
new_context.rep_level,
nullable,
)),
_ => {
return Err(ArrowError(format!(
"creating ListArrayReader with type {:?} should be unreachable",
arrow_type
)))
}
};

Ok(Some(list_array_reader))
}
} else {
reader
result => result,
}
}
}
Expand Down
Loading

0 comments on commit 6ba6299

Please sign in to comment.