Skip to content

Commit

Permalink
support read decimal from parquet binary type (#2160)
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 authored Jul 26, 2022
1 parent ec3530d commit 37dd037
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 27 deletions.
28 changes: 16 additions & 12 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,11 @@ use crate::arrow::array_reader::{
ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
};
use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::buffer::converter::{DecimalArrayConverter, DecimalByteArrayConvert, DecimalFixedLengthByteArrayConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, IntervalYearMonthConverter};
use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
use crate::data_type::{
BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type,
Int96Type,
};
use crate::data_type::{BoolType, ByteArrayType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type};
use crate::errors::Result;
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};

Expand Down Expand Up @@ -233,6 +225,18 @@ fn build_primitive_reader(
column_desc,
arrow_type,
),
Some(DataType::Decimal(precision, scale)) => {
// read decimal data from parquet binary physical type
let convert = DecimalByteArrayConvert::new(DecimalArrayConverter::new(precision as i32, scale as i32));
Ok(Box::new(
ComplexObjectArrayReader::<ByteArrayType,DecimalByteArrayConvert>::new(
page_iterator,
column_desc,
convert,
arrow_type
)?
))
},
_ => make_byte_array_reader(
page_iterator,
column_desc,
Expand All @@ -241,13 +245,13 @@ fn build_primitive_reader(
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
DataType::Decimal(precision, scale) => {
let converter = DecimalConverter::new(DecimalArrayConverter::new(
let converter = DecimalFixedLengthByteArrayConverter::new(DecimalArrayConverter::new(
precision as i32,
scale as i32,
));
Ok(Box::new(ComplexObjectArrayReader::<
FixedLenByteArrayType,
DecimalConverter,
DecimalFixedLengthByteArrayConverter,
>::new(
page_iterator,
column_desc,
Expand Down
7 changes: 6 additions & 1 deletion parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,12 @@ mod tests {
fn test_read_decimal_file() {
use arrow::array::Decimal128Array;
let testdata = arrow::util::test_util::parquet_test_data();
let file_variants = vec![("fixed_length", 25), ("int32", 4), ("int64", 10)];
let file_variants = vec![
("byte_array", 4),
("fixed_length", 25),
("int32", 4),
("int64", 10),
];
for (prefix, target_precision) in file_variants {
let path = format!("{}/{}_decimal.parquet", testdata, prefix);
let file = File::open(&path).unwrap();
Expand Down
45 changes: 33 additions & 12 deletions parquet/src/arrow/buffer/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,6 @@ impl DecimalArrayConverter {
pub fn new(precision: i32, scale: i32) -> Self {
Self { precision, scale }
}

fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
i128::from_be_bytes(result)
}
}

impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array>
Expand All @@ -94,13 +84,41 @@ impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array>
fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<Decimal128Array> {
let array = source
.into_iter()
.map(|array| array.map(|array| Self::from_bytes_to_i128(array.data())))
.map(|array| array.map(|array| from_bytes_to_i128(array.data())))
.collect::<Decimal128Array>()
.with_precision_and_scale(self.precision as usize, self.scale as usize)?;

Ok(array)
}
}

impl Converter<Vec<Option<ByteArray>>, Decimal128Array> for DecimalArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<Decimal128Array> {
let array = source
.into_iter()
.map(|array| array.map(|array| from_bytes_to_i128(array.data())))
.collect::<Decimal128Array>()
.with_precision_and_scale(self.precision as usize, self.scale as usize)?;

Ok(array)
}
}

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(result)
}

/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval,
/// and interprets it as an i32 value representing the Arrow YearMonth value
pub struct IntervalYearMonthArrayConverter {}
Expand Down Expand Up @@ -272,12 +290,15 @@ pub type IntervalDayTimeConverter = ArrayRefConverter<
IntervalDayTimeArrayConverter,
>;

pub type DecimalConverter = ArrayRefConverter<
pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter<
Vec<Option<FixedLenByteArray>>,
Decimal128Array,
DecimalArrayConverter,
>;

pub type DecimalByteArrayConvert =
ArrayRefConverter<Vec<Option<ByteArray>>, Decimal128Array, DecimalArrayConverter>;

pub struct FromConverter<S, T> {
_source: PhantomData<S>,
_dest: PhantomData<T>,
Expand Down
26 changes: 26 additions & 0 deletions parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,32 @@ mod tests {
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}

#[test]
fn test_decimal_fields() {
let message_type = "
message test_schema {
REQUIRED INT32 decimal1 (DECIMAL(4,2));
REQUIRED INT64 decimal2 (DECIMAL(12,2));
REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
}
";

let parquet_group_type = parse_message_type(message_type).unwrap();

let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
parquet_to_arrow_schema(&parquet_schema, None).unwrap();

let arrow_fields = vec![
Field::new("decimal1", DataType::Decimal(4,2), false),
Field::new("decimal2", DataType::Decimal(12,2), false),
Field::new("decimal3", DataType::Decimal(30,2), false),
Field::new("decimal4", DataType::Decimal(33,2), false),
];
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}

#[test]
fn test_byte_array_fields() {
let message_type = "
Expand Down
6 changes: 4 additions & 2 deletions parquet/src/arrow/schema/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn from_parquet(parquet_type: &Type) -> Result<DataType> {
PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
PhysicalType::FLOAT => Ok(DataType::Float32),
PhysicalType::DOUBLE => Ok(DataType::Float64),
PhysicalType::BYTE_ARRAY => from_byte_array(basic_info),
PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
}
Expand Down Expand Up @@ -224,7 +224,7 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataTy
}
}

fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32 ) -> Result<DataType> {
match (info.logical_type(), info.converted_type()) {
(Some(LogicalType::String), _) => Ok(DataType::Utf8),
(Some(LogicalType::Json), _) => Ok(DataType::Binary),
Expand All @@ -235,6 +235,8 @@ fn from_byte_array(info: &BasicTypeInfo) -> Result<DataType> {
(None, ConvertedType::BSON) => Ok(DataType::Binary),
(None, ConvertedType::ENUM) => Ok(DataType::Binary),
(None, ConvertedType::UTF8) => Ok(DataType::Utf8),
(Some(LogicalType::Decimal {precision, scale}), _) => Ok(DataType::Decimal(precision as usize, scale as usize)),
(None, ConvertedType::DECIMAL) => Ok(DataType::Decimal(precision as usize, scale as usize)),
(logical, converted) => Err(arrow_err!(
"Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
logical,
Expand Down
16 changes: 16 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ mod tests {
assert_eq!(ConvertedType::JSON.to_string(), "JSON");
assert_eq!(ConvertedType::BSON.to_string(), "BSON");
assert_eq!(ConvertedType::INTERVAL.to_string(), "INTERVAL");
assert_eq!(ConvertedType::DECIMAL.to_string(), "DECIMAL")
}

#[test]
Expand Down Expand Up @@ -1153,6 +1154,10 @@ mod tests {
ConvertedType::from(Some(parquet::ConvertedType::Interval)),
ConvertedType::INTERVAL
);
assert_eq!(
ConvertedType::from(Some(parquet::ConvertedType::Decimal)),
ConvertedType::DECIMAL
)
}

#[test]
Expand Down Expand Up @@ -1244,6 +1249,10 @@ mod tests {
Some(parquet::ConvertedType::Interval),
ConvertedType::INTERVAL.into()
);
assert_eq!(
Some(parquet::ConvertedType::Decimal),
ConvertedType::DECIMAL.into()
)
}

#[test]
Expand Down Expand Up @@ -1409,6 +1418,13 @@ mod tests {
.unwrap(),
ConvertedType::INTERVAL
);
assert_eq!(
ConvertedType::DECIMAL
.to_string()
.parse::<ConvertedType>()
.unwrap(),
ConvertedType::DECIMAL
)
}

#[test]
Expand Down

0 comments on commit 37dd037

Please sign in to comment.