Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async arrow parquet reader #1154

Merged
merged 7 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,31 @@ jobs:
run: |
export CARGO_HOME="/github/home/.cargo"
export CARGO_TARGET_DIR="/github/home/target"

# run tests on all workspace members with default feature list
cargo test

# Switch to arrow crate
cd arrow
# re-run tests on arrow workspace with additional features
# re-run tests on arrow crate with additional features
cargo test --features=prettyprint
# run test on arrow with minimal set of features
# run test on arrow crate with minimal set of features
cargo test --no-default-features
cargo run --example builders
cargo run --example dynamic_types
cargo run --example read_csv
cargo run --example read_csv_infer_schema
# Exit arrow directory
cd ..
(cd parquet && cargo check --no-default-features)
(cd arrow && cargo check --no-default-features)
(cd arrow-flight && cargo check --no-default-features)
cargo check --no-default-features

# Switch to parquet crate
cd ../parquet
# re-run tests on parquet crate with async feature enabled
cargo test --features=async
cargo check --no-default-features

# Switch to arrow-flight
cd ../arrow-flight
cargo check --no-default-features

# test the --features "simd" of the arrow crate. This requires nightly.
linux-test-simd:
Expand Down Expand Up @@ -238,7 +247,7 @@ jobs:
run: |
export CARGO_HOME="/github/home/.cargo"
export CARGO_TARGET_DIR="/github/home/target"
cargo clippy --features test_common --all-targets --workspace -- -D warnings -A clippy::redundant_field_names
cargo clippy --features test_common --features prettyprint --features=async --all-targets --workspace -- -D warnings -A clippy::redundant_field_names

check_benches:
name: Check Benchmarks (but don't run them)
Expand Down
10 changes: 5 additions & 5 deletions arrow/src/util/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
let mut cells = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
cells.push(Cell::new(&array_value_to_string(&column, row)?));
cells.push(Cell::new(&array_value_to_string(column, row)?));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Activating pretty_print in parquet appears to have made clippy find a load of new stuff in arrow 😅

}
table.add_row(cells);
}
Expand All @@ -96,7 +96,7 @@ fn create_column(field: &str, columns: &[ArrayRef]) -> Result<Table> {

for col in columns {
for row in 0..col.len() {
let cells = vec![Cell::new(&array_value_to_string(&col, row)?)];
let cells = vec![Cell::new(&array_value_to_string(col, row)?)];
table.add_row(cells);
}
}
Expand Down Expand Up @@ -320,7 +320,7 @@ mod tests {
let mut builder = FixedSizeBinaryBuilder::new(3, 3);

builder.append_value(&[1, 2, 3]).unwrap();
builder.append_null();
builder.append_null().unwrap();
builder.append_value(&[7, 8, 9]).unwrap();

let array = Arc::new(builder.finish());
Expand Down Expand Up @@ -677,7 +677,7 @@ mod tests {
)?;

let mut buf = String::new();
write!(&mut buf, "{}", pretty_format_batches(&[batch])?.to_string()).unwrap();
write!(&mut buf, "{}", pretty_format_batches(&[batch])?).unwrap();

let s = vec![
"+---+-----+",
Expand All @@ -689,7 +689,7 @@ mod tests {
"| d | 100 |",
"+---+-----+",
];
let expected = String::from(s.join("\n"));
let expected = s.join("\n");
assert_eq!(expected, buf);

Ok(())
Expand Down
14 changes: 9 additions & 5 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
keywords = [ "arrow", "parquet", "hadoop" ]
keywords = ["arrow", "parquet", "hadoop"]
readme = "README.md"
build = "build.rs"
edition = "2021"
Expand All @@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"
futures = { version = "0.3", optional = true }
tustvold marked this conversation as resolved.
Show resolved Hide resolved
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] }

[dev-dependencies]
criterion = "0.3"
Expand All @@ -55,24 +57,26 @@ brotli = "3.3"
flate2 = "1.0"
lz4 = "1.23"
serde_json = { version = "1.0", features = ["preserve_order"] }
arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] }
arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] }

[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
cli = ["serde_json", "base64", "clap"]
test_common = []
# Experimental, unstable functionality primarily used for testing
experimental = []
# Enable async API
async = ["futures", "tokio"]

[[ bin ]]
[[bin]]
name = "parquet-read"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-schema"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-rowcount"
required-features = ["cli"]

Expand Down
60 changes: 38 additions & 22 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use arrow::datatypes::{
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema,
Time32MillisecondType as ArrowTime32MillisecondType,
SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
pub trait ArrayReader: Send {
fn as_any(&self) -> &dyn Any;

/// Returns the arrow type of this array reader.
Expand All @@ -117,6 +117,26 @@ pub trait ArrayReader {
fn get_rep_levels(&self) -> Option<&[i16]>;
}

/// A collection of row groups
pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> Result<SchemaDescPtr>;

/// Returns an iterator over the column chunks for particular column
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}

impl RowGroupCollection for Arc<dyn FileReader> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does mean we have double dynamic dispatch, given these methods are called a couple of times per-file I'm inclined to consider this largely irrelevant

fn schema(&self) -> Result<SchemaDescPtr> {
Ok(self.metadata().file_metadata().schema_descr_ptr())
}

fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than batch_size if
Expand Down Expand Up @@ -478,7 +498,7 @@ where
impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
where
T: DataType,
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
{
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -1311,9 +1331,9 @@ impl ArrayReader for StructArrayReader {
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
parquet_schema: SchemaDescPtr,
arrow_schema: Schema,
arrow_schema: SchemaRef,
column_indices: T,
file_reader: Arc<dyn FileReader>,
row_groups: Box<dyn RowGroupCollection>,
) -> Result<Box<dyn ArrayReader>>
where
T: IntoIterator<Item = usize>,
Expand Down Expand Up @@ -1351,13 +1371,8 @@ where
fields: filtered_root_fields,
};

ArrayReaderBuilder::new(
Arc::new(proj),
Arc::new(arrow_schema),
Arc::new(leaves),
file_reader,
)
.build_array_reader()
ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
.build_array_reader()
}

/// Used to build array reader.
Expand All @@ -1367,7 +1382,7 @@ struct ArrayReaderBuilder {
// Key: columns that need to be included in final array builder
// Value: column index in schema
columns_included: Arc<HashMap<*const Type, usize>>,
file_reader: Arc<dyn FileReader>,
row_groups: Box<dyn RowGroupCollection>,
}

/// Used in type visitor.
Expand Down Expand Up @@ -1667,13 +1682,13 @@ impl<'a> ArrayReaderBuilder {
root_schema: TypePtr,
arrow_schema: Arc<Schema>,
columns_included: Arc<HashMap<*const Type, usize>>,
file_reader: Arc<dyn FileReader>,
file_reader: Box<dyn RowGroupCollection>,
) -> Self {
Self {
root_schema,
arrow_schema,
columns_included,
file_reader,
row_groups: file_reader,
}
}

Expand Down Expand Up @@ -1707,10 +1722,10 @@ impl<'a> ArrayReaderBuilder {
context.rep_level,
context.path.clone(),
));
let page_iterator = Box::new(FilePageIterator::new(
self.columns_included[&(cur_type.as_ref() as *const Type)],
self.file_reader.clone(),
)?);

let page_iterator = self
.row_groups
.column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;

let arrow_type: Option<ArrowType> = self
.get_arrow_field(&cur_type, context)
Expand Down Expand Up @@ -2823,7 +2838,8 @@ mod tests {
#[test]
fn test_create_array_reader() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let file_reader: Arc<dyn FileReader> =
Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let arrow_schema = parquet_to_arrow_schema(
Expand All @@ -2834,9 +2850,9 @@ mod tests {

let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
arrow_schema,
Arc::new(arrow_schema),
vec![0usize].into_iter(),
file_reader,
Box::new(file_reader),
)
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ impl ArrowReader for ParquetFileArrowReader {
.metadata()
.file_metadata()
.schema_descr_ptr(),
self.get_schema()?,
Arc::new(self.get_schema()?),
column_indices,
self.file_reader.clone(),
Box::new(self.file_reader.clone()),
)?;

ParquetRecordBatchReader::try_new(batch_size, array_reader)
Expand Down
Loading