Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added read Decimal from parquet #489

Merged
merged 3 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pyarrow as pa
import pyarrow.parquet
import os
from decimal import Decimal

PYARROW_PATH = "fixtures/pyarrow3"

Expand All @@ -11,6 +12,7 @@ def case_basic_nullable(size=1):
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
string_large = ["ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCD😃🌚🕳👊"] * 10
decimal = [Decimal(e) if e is not None else None for e in int64]

fields = [
pa.field("int64", pa.int64()),
Expand All @@ -20,6 +22,10 @@ def case_basic_nullable(size=1):
pa.field("date", pa.timestamp("ms")),
pa.field("uint32", pa.uint32()),
pa.field("string_large", pa.utf8()),
# decimal testing
pa.field("decimal_9", pa.decimal128(9,0)),
pa.field("decimal_18", pa.decimal128(18,0)),
pa.field("decimal_26", pa.decimal128(26,0)),
]
schema = pa.schema(fields)

Expand All @@ -32,6 +38,9 @@ def case_basic_nullable(size=1):
"date": int64 * size,
"uint32": int64 * size,
"string_large": string_large * size,
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
},
schema,
f"basic_nullable_{size*10}.parquet",
Expand All @@ -43,6 +52,7 @@ def case_basic_required(size=1):
float64 = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
string = ["Hello", "bbb", "aa", "", "bbb", "abc", "bbb", "bbb", "def", "aaa"]
boolean = [True, True, False, False, False, True, True, True, True, True]
decimal = [Decimal(e) for e in int64]

fields = [
pa.field("int64", pa.int64(), nullable=False),
Expand All @@ -57,6 +67,9 @@ def case_basic_required(size=1):
nullable=False,
),
pa.field("uint32", pa.uint32(), nullable=False),
pa.field("decimal_9", pa.decimal128(9,0), nullable=False),
pa.field("decimal_18", pa.decimal128(18,0), nullable=False),
pa.field("decimal_26", pa.decimal128(26,0), nullable=False),
]
schema = pa.schema(fields)

Expand All @@ -68,6 +81,9 @@ def case_basic_required(size=1):
"bool": boolean * size,
"date": int64 * size,
"uint32": int64 * size,
"decimal_9": decimal * size,
"decimal_18": decimal * size,
"decimal_26": decimal * size,
},
schema,
f"basic_required_{size*10}.parquet",
Expand Down
53 changes: 43 additions & 10 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! APIs to read from Parquet format.
use std::{
io::{Read, Seek},
sync::Arc,
};
use std::{convert::TryInto, io::{Read, Seek}, sync::Arc};

use futures::{AsyncRead, AsyncSeek, Stream};
pub use parquet2::{
Expand All @@ -21,11 +18,7 @@ pub use parquet2::{
types::int96_to_i64_ns,
};

use crate::{
array::{Array, DictionaryKey},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
};
use crate::{array::{Array, DictionaryKey, PrimitiveArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{ArrowError, Result}};

mod binary;
mod boolean;
Expand Down Expand Up @@ -211,7 +204,47 @@ pub fn page_iter_to_array<
FixedSizeBinary(_) => Ok(Box::new(fixed_size_binary::iter_to_array(
iter, data_type, metadata,
)?)),

Decimal(_, _) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, ..} => match physical_type{
PhysicalType::Int32 => primitive::iter_to_array(
iter,
metadata,
data_type,
|x: i32| x as i128,
),
PhysicalType::Int64 => primitive::iter_to_array(
iter,
metadata,
data_type,
|x: i64| x as i128,
),
PhysicalType::FixedLenByteArray(n) => {
if *n > 16 {
Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
} else {
let paddings = (0..(16-*n)).map(|_| 0u8).collect::<Vec<_>>();
fixed_size_binary::iter_to_array(iter, DataType::FixedSizeBinary(*n), metadata)
.map(|e|{
let a = e.into_iter().map(|v|
v.and_then(|v1| {
[&paddings, v1].concat().try_into().map(
|pad16| i128::from_be_bytes(pad16)
).ok()
}
)
).collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(a).to(data_type)) as Box<dyn Array>
}
)
}
},
_ => unreachable!()
},
_ => unreachable!()
},
List(ref inner) => match inner.data_type() {
UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8),
UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16),
Expand Down
3 changes: 3 additions & 0 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ pub fn from_int64(
ParquetTimeUnit::MICROS(_) => DataType::Time64(TimeUnit::Microsecond),
ParquetTimeUnit::NANOS(_) => DataType::Time64(TimeUnit::Nanosecond),
},
(Some(PrimitiveConvertedType::Decimal(precision,scale)), _) => {
DataType::Decimal(*precision as usize, *scale as usize)
}
(c, l) => {
return Err(ArrowError::NotYetImplemented(format!(
"The conversion of (Int64, {:?}, {:?}) to arrow still not implemented",
Expand Down
5 changes: 4 additions & 1 deletion src/io/parquet/read/schema/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ mod tests {
"bool",
"date",
"uint32",
"string_large"
"string_large",
"decimal_9",
"decimal_18",
"decimal_26"
]
);
Ok(())
Expand Down
105 changes: 105 additions & 0 deletions src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::convert::{TryFrom, TryInto};

use super::super::schema;
use super::primitive::PrimitiveStatistics;
use crate::datatypes::DataType;
use crate::error::{ArrowError, Result};
use parquet2::schema::types::ParquetType;
use parquet2::{
schema::types::PhysicalType,
statistics::{
FixedLenStatistics as ParquetFixedLenStatistics, Statistics as ParquetStatistics,
},
};

use super::Statistics;

#[derive(Debug, Clone, PartialEq)]
pub struct FixedLenStatistics {
pub null_count: Option<i64>,
pub distinct_count: Option<i64>,
pub min_value: Option<Vec<u8>>,
pub max_value: Option<Vec<u8>>,
pub data_type: DataType,
}

impl Statistics for FixedLenStatistics {
fn data_type(&self) -> &DataType {
&self.data_type
}
}

impl From<&ParquetFixedLenStatistics> for FixedLenStatistics {
fn from(stats: &ParquetFixedLenStatistics) -> Self {
let byte_lens = match stats.physical_type() {
PhysicalType::FixedLenByteArray(size) => *size,
_ => unreachable!(),
};
Self {
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats.min_value.clone(),
max_value: stats.max_value.clone(),
data_type: DataType::FixedSizeBinary(byte_lens),
}
}
}

impl TryFrom<(&ParquetFixedLenStatistics, DataType)> for PrimitiveStatistics<i128> {
type Error = ArrowError;
fn try_from((stats, data_type): (&ParquetFixedLenStatistics, DataType)) -> Result<Self> {
let byte_lens = match stats.physical_type() {
PhysicalType::FixedLenByteArray(size) => *size,
_ => unreachable!(),
};
if byte_lens > 16 {
Err(ArrowError::Other(format!(
"Can't deserialize i128 from Fixed Len Byte array with lengtg {:?}",
byte_lens
)))
} else {
let paddings = (0..(16 - byte_lens)).map(|_| 0u8).collect::<Vec<_>>();
let max_value = stats.max_value.as_ref().and_then(|value| {
[paddings.as_slice(), value]
.concat()
.try_into()
.map(|v| i128::from_be_bytes(v))
.ok()
});

let min_value = stats.min_value.as_ref().and_then(|value| {
[paddings.as_slice(), value]
.concat()
.try_into()
.map(|v| i128::from_be_bytes(v))
.ok()
});
Ok(Self {
data_type,
null_count: stats.null_count,
distinct_count: stats.distinct_count,
max_value,
min_value,
})
}
}
}

pub(super) fn statistics_from_fix_len(
stats: &ParquetFixedLenStatistics,
type_: &ParquetType,
) -> Result<Box<dyn Statistics>> {
let data_type = schema::to_data_type(type_)?.unwrap();

use DataType::*;
Ok(match data_type {
Decimal(_, _) => Box::new(PrimitiveStatistics::<i128>::try_from((stats, data_type))?),
FixedSizeBinary(_) => Box::new(FixedLenStatistics::from(stats)),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Can't read {:?} from parquet",
other
)))
}
})
}
6 changes: 6 additions & 0 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ mod binary;
pub use binary::*;
mod boolean;
pub use boolean::*;
mod fixlen;
pub use fixlen::*;

/// Trait representing a deserialized parquet statistics into arrow.
pub trait Statistics: std::fmt::Debug {
Expand Down Expand Up @@ -70,6 +72,10 @@ pub fn deserialize_statistics(stats: &dyn ParquetStatistics) -> Result<Box<dyn S
DataType::Float64,
))))
}
PhysicalType::FixedLenByteArray(_) =>{
let stats = stats.as_any().downcast_ref().unwrap();
fixlen::statistics_from_fix_len(stats, stats.descriptor.type_())
}
_ => Err(ArrowError::NotYetImplemented(
"Reading Fixed-len array statistics is not yet supported".to_string(),
)),
Expand Down
2 changes: 2 additions & 0 deletions src/io/parquet/read/statistics/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub(super) fn statistics_from_i32(
UInt32 => Box::new(PrimitiveStatistics::<u32>::from((stats, data_type))),
Int8 => Box::new(PrimitiveStatistics::<i8>::from((stats, data_type))),
Int16 => Box::new(PrimitiveStatistics::<i16>::from((stats, data_type))),
Decimal(_, _) => Box::new(PrimitiveStatistics::<i128>::from((stats, data_type))),
_ => Box::new(PrimitiveStatistics::<i32>::from((stats, data_type))),
})
}
Expand All @@ -69,6 +70,7 @@ pub(super) fn statistics_from_i64(
UInt64 => {
Box::new(PrimitiveStatistics::<u64>::from((stats, data_type))) as Box<dyn Statistics>
}
Decimal(_, _) => Box::new(PrimitiveStatistics::<i128>::from((stats, data_type))),
_ => Box::new(PrimitiveStatistics::<i64>::from((stats, data_type))),
})
}
43 changes: 39 additions & 4 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use parquet2::{
compression::create_codec, encoding::Encoding, metadata::ColumnDescriptor,
page::CompressedDataPage, write::WriteOptions,
compression::create_codec,
encoding::Encoding,
metadata::ColumnDescriptor,
page::CompressedDataPage,
statistics::{serialize_statistics, deserialize_statistics, ParquetStatistics},
write::WriteOptions,
};

use super::utils;
use super::{binary::ord_binary, utils};
use crate::{
array::{Array, FixedSizeBinaryArray},
error::Result,
Expand Down Expand Up @@ -54,16 +58,47 @@ pub fn array_to_page(
buffer
};

let statistics = if options.write_statistics {
build_statistics(array, descriptor.clone())
} else {
None
};

utils::build_plain_page(
buffer,
array.len(),
array.null_count(),
uncompressed_page_size,
0,
definition_levels_byte_length,
None,
statistics,
descriptor,
options,
Encoding::Plain,
)
}

pub(super) fn build_statistics(
array: &FixedSizeBinaryArray,
descriptor: ColumnDescriptor,
) -> Option<ParquetStatistics> {
let pq_statistics = &ParquetStatistics {
max: None,
min: None,
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
min_value: array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
};
deserialize_statistics(pq_statistics,descriptor).map(
|e| serialize_statistics(&*e)
).ok()
}
Loading