Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read parquet asynchronously.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 9, 2021
1 parent 988612a commit 1ada073
Show file tree
Hide file tree
Showing 28 changed files with 125 additions and 105 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ futures = { version = "0.3", optional = true }
ahash = { version = "0.7", optional = true }

#parquet2 = { version = "0.2", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", ref = "65990906a90edf93530a2c7a54b31b023974e5f3", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "41ca6b3fdbe62f9fa5f1758748dce9a8db4f72d6", optional = true, default_features = false, features = ["stream"] }

[dev-dependencies]
rand = "0.8"
Expand Down
4 changes: 2 additions & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow2::error::Result;
use arrow2::io::parquet::write::{Encoding, RowGroupIterator};
use arrow2::io::{
json_integration::ArrowJson,
parquet::write::{write_file, CompressionCodec, Version, WriteOptions},
parquet::write::{write_file, Compression, Version, WriteOptions},
};
use arrow2::{datatypes::Schema, io::json_integration::to_record_batch, record_batch::RecordBatch};

Expand Down Expand Up @@ -154,7 +154,7 @@ fn main() -> Result<()> {

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version,
};

Expand Down
4 changes: 2 additions & 2 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow2::{
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
array_to_page, write_file, CompressionCodec, DynIter, Encoding, Version, WriteOptions,
array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions,
},
};

Expand All @@ -16,7 +16,7 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()>

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V2,
};
let encoding = Encoding::Plain;
Expand Down
7 changes: 4 additions & 3 deletions examples/parquet_write_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{write_file, CompressionCodec, RowGroupIterator, Version, WriteOptions},
io::parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
record_batch::RecordBatch,
};
use parquet2::schema::Encoding;

fn write_batch(path: &str, batch: RecordBatch) -> Result<()> {
let schema = batch.schema().clone();

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V2,
};

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ mod tests {
mod tests_integration {
use std::sync::Arc;

use super::write::CompressionCodec;
use super::write::Compression;
use crate::array::{Array, PrimitiveArray, Utf8Array};
use crate::datatypes::DataType;
use crate::datatypes::TimeUnit;
Expand All @@ -421,7 +421,7 @@ mod tests_integration {
fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result<Vec<u8>> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage, DataPageHeader},
page::{BinaryPageDict, DataPage, DataPageHeader, DataPageHeaderExt},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -215,7 +215,7 @@ fn extend_from_page<O: Offset>(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use parquet2::{
encoding::Encoding,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
StreamingIterator,
Expand Down Expand Up @@ -121,8 +121,8 @@ fn extend_from_page<O: Offset>(

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -137,11 +137,11 @@ fn extend_from_page<O: Offset>(
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -123,7 +123,7 @@ fn extend_from_page(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => {
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use parquet2::{
encoding::Encoding,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
StreamingIterator,
Expand Down Expand Up @@ -107,8 +107,8 @@ fn extend_from_page(

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -123,11 +123,11 @@ fn extend_from_page(
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, FixedLenByteArrayPageDict},
page::{DataPage, DataPageHeader, DataPageHeaderExt, FixedLenByteArrayPageDict},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -171,7 +171,7 @@ pub(crate) fn extend_from_page(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
29 changes: 24 additions & 5 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::io::{Read, Seek};

use futures::Stream;
use futures::{AsyncRead, AsyncSeek, Stream};
pub use parquet2::{
error::ParquetError,
metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData},
page::{CompressedDataPage, DataPage, DataPageHeader},
read::{
decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata,
decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream,
read_metadata as _read_metadata, read_metadata_async as _read_metadata_async,
streaming_iterator, Decompressor, PageIterator, StreamingIterator,
},
schema::{
types::{LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
TimeUnit as ParquetTimeUnit, TimestampType,
},
types::int96_to_i64_ns,
Expand Down Expand Up @@ -48,11 +49,29 @@ pub fn get_page_iterator<'b, RR: Read + Seek>(
)?)
}

/// Reads parquets' metadata.
/// Creates a new iterator of compressed pages.
pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>(
metadata: &'a FileMetaData,
row_group: usize,
column: usize,
reader: &'a mut RR,
buffer: Vec<u8>,
) -> Result<impl Stream<Item = std::result::Result<CompressedDataPage, ParquetError>> + 'a> {
Ok(_get_page_stream(metadata, row_group, column, reader, buffer).await?)
}

/// Reads parquets' metadata syncronously.
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
Ok(_read_metadata(reader)?)
}

/// Reads parquets' metadata asynchronously.
pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
) -> Result<FileMetaData> {
Ok(_read_metadata_async(reader).await?)
}

pub fn page_iter_to_array<
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
>(
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/primitive/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, PrimitivePageDict},
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels,
types::NativeType,
};
Expand Down Expand Up @@ -160,7 +160,7 @@ where
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
7 changes: 3 additions & 4 deletions src/io/parquet/read/primitive/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128},
page::{DataPage, DataPageHeader, PrimitivePageDict},
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::{levels, StreamingIterator},
schema::Encoding,
types::NativeType,
};

Expand Down Expand Up @@ -102,7 +101,7 @@ where
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/primitive/nested.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::Encoding,
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
types::NativeType,
};
Expand Down Expand Up @@ -127,8 +127,8 @@ where

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -143,11 +143,11 @@ where
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use parquet2::{
schema::{
types::{
BasicTypeInfo, GroupConvertedType, LogicalType, ParquetType, PhysicalType,
PrimitiveConvertedType, TimeUnit as ParquetTimeUnit,
PrimitiveConvertedType, TimeUnit as ParquetTimeUnit, TimestampType,
},
Repetition, TimestampType,
Repetition,
},
};

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet2::{encoding::get_length, schema::Encoding};
use parquet2::encoding::{get_length, Encoding};

use crate::error::ArrowError;

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet2::schema::Encoding;
use parquet2::encoding::Encoding;
use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions};

use super::super::{levels, utils};
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/write/boolean/basic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use parquet2::{
encoding::hybrid_rle::bitpacked_encode,
encoding::{hybrid_rle::bitpacked_encode, Encoding},
metadata::ColumnDescriptor,
page::CompressedDataPage,
schema::Encoding,
statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics},
write::WriteOptions,
};
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet2::schema::Encoding;
use parquet2::encoding::Encoding;
use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions};

use super::super::{levels, utils};
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use parquet2::encoding::hybrid_rle::encode_u32;
use parquet2::encoding::{hybrid_rle::encode_u32, Encoding};
use parquet2::page::{CompressedDictPage, CompressedPage};
use parquet2::schema::Encoding;
use parquet2::write::DynIter;
use parquet2::{metadata::ColumnDescriptor, write::WriteOptions};

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
compression::create_codec, metadata::ColumnDescriptor, page::CompressedDataPage,
schema::Encoding, write::WriteOptions,
compression::create_codec, encoding::Encoding, metadata::ColumnDescriptor,
page::CompressedDataPage, write::WriteOptions,
};

use super::utils;
Expand Down
Loading

0 comments on commit 1ada073

Please sign in to comment.