Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into feature/impl-iterator-structarray
Browse files Browse the repository at this point in the history
  • Loading branch information
illumination-k committed Nov 17, 2021
2 parents 4245942 + 9d4107c commit 8329703
Show file tree
Hide file tree
Showing 24 changed files with 893 additions and 596 deletions.
16 changes: 13 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] }
# parquet support
parquet2 = { version = "0.7", optional = true, default_features = false, features = ["stream"] }

# avro
avro-rs = { version = "0.13", optional = true, default_features = false }

# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
Expand All @@ -88,6 +91,7 @@ tokio = { version = "1", features = ["macros", "rt", "fs"] }
tokio-util = { version = "0.6", features = ["compat"] }
# used to run formal property testing
proptest = { version = "1", default_features = false, features = ["std"] }
avro-rs = { version = "0.13", features = ["snappy"] }

[package.metadata.docs.rs]
features = ["full"]
Expand All @@ -108,6 +112,7 @@ full = [
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_avro_compression",
"regex",
"merge_sort",
"compute",
Expand All @@ -132,7 +137,11 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down Expand Up @@ -162,6 +171,7 @@ skip_feature_sets = [
["io_csv_async"],
["io_csv_read_async"],
["io_avro"],
["io_avro_compression"],
["io_json"],
["io_flight"],
["io_ipc"],
Expand Down
36 changes: 15 additions & 21 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,43 @@ use std::io::BufReader;
use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<dyn Array>> {
fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
let file_metadata = read::read_metadata(&mut file)?;
let metadata = read::read_metadata(&mut file)?;

// Convert the files' metadata into an arrow schema. This is CPU-only and amounts to
// parse thrift if the arrow format is available on a key, or infering the arrow schema from
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&file_metadata)?;
let arrow_schema = read::get_schema(&metadata)?;

// get the columns' metadata
let metadata = file_metadata.row_groups[row_group].column(column);
// Created an iterator of column chunks. Each iteration
// yields an iterator of compressed pages. There is almost no CPU work in iterating.
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// Construct an iterator over pages. This binds `file` to this iterator, and each iteration
// is IO intensive as it will read a compressed page into memory. There is almost no CPU work
// on this operation
let pages = read::get_page_iterator(metadata, &mut file, None, vec![])?;
// get the columns' field
let field = &arrow_schema.fields()[field];

// get the columns' logical type
let data_type = arrow_schema.fields()[column].data_type().clone();
// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
// Because `columns` is an iterator, it uses a combination of IO and CPU.
let (array, _, _) = read::column_iter_to_array(columns, field, vec![])?;

// This is the actual work. In this case, pages are read (by calling `iter.next()`) and are
// immediately decompressed, decoded, deserialized to arrow and deallocated.
// This uses a combination of IO and CPU. At this point, `array` is the arrow-corresponding
// array of the parquets' physical type.
// `Decompressor` re-uses an internal buffer for de-compression, thereby maximizing memory re-use.
let mut pages = read::Decompressor::new(pages, vec![]);

read::page_iter_to_array(&mut pages, metadata, data_type)
Ok(array)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let column = args[2].parse::<usize>().unwrap();
let field = args[2].parse::<usize>().unwrap();
let row_group = args[3].parse::<usize>().unwrap();

let array = read_column_chunk(file_path, row_group, column)?;
let array = read_field(file_path, row_group, field)?;
println!("{}", array);
Ok(())
}
127 changes: 76 additions & 51 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,90 +1,115 @@
use crossbeam_channel::unbounded;
use parquet2::metadata::ColumnChunkMetaData;

use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use arrow2::{array::Array, error::Result, io::parquet::read};
use arrow2::{
array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator,
record_batch::RecordBatch,
};

fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
// prepare a channel to send serialized records from threads
fn parallel_read(path: &str, row_group: usize) -> Result<RecordBatch> {
// prepare a channel to send compressed pages across threads.
let (tx, rx) = unbounded();

let mut file = File::open(path)?;
let file_metadata = read::read_metadata(&mut file)?;
let arrow_schema = Arc::new(read::get_schema(&file_metadata)?);

let file_metadata = Arc::new(file_metadata);

let start = SystemTime::now();
// spawn a thread to produce `Vec<CompressedPage>` (IO bounded)
let producer_metadata = file_metadata.clone();
let child = thread::spawn(move || {
for column in 0..producer_metadata.schema().num_columns() {
for row_group in 0..producer_metadata.row_groups.len() {
let start = SystemTime::now();
let column_metadata = producer_metadata.row_groups[row_group].column(column);
println!("produce start: {} {}", column, row_group);
let pages = read::get_page_iterator(column_metadata, &mut file, None, vec![])
.unwrap()
.collect::<Vec<_>>();
println!(
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
column,
row_group
);
tx.send((column, row_group, pages)).unwrap();
}
}
});

let mut children = Vec::new();
// use 3 consumers of to decompress, decode and deserialize.
for _ in 0..3 {
let rx_consumer = rx.clone();
let metadata_consumer = file_metadata.clone();
let arrow_schema_consumer = arrow_schema.clone();
let child = thread::spawn(move || {
let (column, row_group, pages) = rx_consumer.recv().unwrap();
let producer = thread::spawn(move || {
for (field_i, field) in file_metadata.schema().fields().iter().enumerate() {
let start = SystemTime::now();
println!("consumer start - {} {}", column, row_group);
let metadata = metadata_consumer.row_groups[row_group].column(column);
let data_type = arrow_schema_consumer.fields()[column].data_type().clone();

let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field_i,
None,
vec![],
);

println!("produce start - field: {}", field_i);

let mut column_chunks = vec![];
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

let array = read::page_iter_to_array(&mut pages, metadata, data_type);
column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}
// todo: create API to allow sending each column (and not column chunks) to be processed in parallel
tx.send((field_i, field.clone(), column_chunks)).unwrap();
println!(
"consumer end - {:?}: {} {}",
"produce end - {:?}: {} {}",
start.elapsed().unwrap(),
column,
field_i,
row_group
);
array
});
children.push(child);
}
}
});

// use 2 consumers for CPU-intensive to decompress, decode and deserialize.
#[allow(clippy::needless_collect)] // we need to collect to parallelize
let consumers = (0..2)
.map(|i| {
let rx_consumer = rx.clone();
let arrow_schema_consumer = arrow_schema.clone();
thread::spawn(move || {
let mut arrays = vec![];
while let Ok((field_i, parquet_field, column_chunks)) = rx_consumer.recv() {
let start = SystemTime::now();
let field = &arrow_schema_consumer.fields()[field_i];
println!("consumer {} start - {}", i, field_i);

let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);

let array = read::column_iter_to_array(columns, field, vec![]).map(|x| x.0);
println!(
"consumer {} end - {:?}: {}",
i,
start.elapsed().unwrap(),
field_i
);

arrays.push((field_i, array))
}
arrays
})
})
.collect::<Vec<_>>();

child.join().expect("child thread panicked");
producer.join().expect("producer thread panicked");

let arrays = children
// collect all columns (join threads)
let mut columns = consumers
.into_iter()
.map(|x| x.join().unwrap())
.collect::<Result<Vec<_>>>()?;
.flatten()
.map(|x| Ok((x.0, x.1?)))
.collect::<Result<Vec<(usize, Box<dyn Array>)>>>()?;
// order may not be the same
columns.sort_unstable_by_key(|x| x.0);
let columns = columns.into_iter().map(|x| x.1.into()).collect();
println!("Finished - {:?}", start.elapsed().unwrap());

Ok(arrays)
RecordBatch::try_new(arrow_schema, columns)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = &args[1];

let arrays = parallel_read(file_path)?;
for array in arrays {
let batch = parallel_read(file_path, 0)?;
for array in batch.columns() {
println!("{}", array)
}
Ok(())
Expand Down
44 changes: 43 additions & 1 deletion parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,48 @@ def case_nested(size):
)


def case_struct(size):
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
struct_fields = [
("f1", pa.utf8()),
("f2", pa.bool_()),
]
schema = pa.schema(
[
pa.field(
"struct",
pa.struct(struct_fields),
),
pa.field(
"struct_struct",
pa.struct(
[
("f1", pa.struct(struct_fields)),
("f2", pa.bool_()),
]
),
),
]
)

struct = pa.StructArray.from_arrays(
[pa.array(string * size), pa.array(boolean * size)],
fields=struct_fields,
)
return (
{
"struct": struct,
"struct_struct": pa.StructArray.from_arrays(
[struct, pa.array(boolean * size)],
names=["f1", "f2"],
),
},
schema,
f"struct_nullable_{size*10}.parquet",
)


def write_pyarrow(
case,
size: int,
Expand Down Expand Up @@ -228,7 +270,7 @@ def write_pyarrow(
)


for case in [case_basic_nullable, case_basic_required, case_nested]:
for case in [case_basic_nullable, case_basic_required, case_nested, case_struct]:
for version in [1, 2]:
for use_dict in [True, False]:
write_pyarrow(case, 1, version, use_dict, False, False)
Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pub mod read;

use crate::error::ArrowError;

impl From<avro_rs::SerError> for ArrowError {
fn from(error: avro_rs::SerError) -> Self {
impl From<avro_rs::Error> for ArrowError {
fn from(error: avro_rs::Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}
Loading

0 comments on commit 8329703

Please sign in to comment.