Skip to content

Commit

Permalink
Remove DecimalByteArrayConvert (#2480) (#2522)
Browse files Browse the repository at this point in the history
* Remove DecimalByteArrayConvert (#2480)

* Clippy
  • Loading branch information
tustvold authored Aug 21, 2022
1 parent de7ad62 commit 34216d5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 40 deletions.
16 changes: 2 additions & 14 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::arrow::array_reader::{
PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
};
use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalByteArrayConvert, DecimalFixedLengthByteArrayConverter,
DecimalArrayConverter, DecimalFixedLengthByteArrayConverter,
FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter,
Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
Expand All @@ -35,7 +35,7 @@ use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
use crate::data_type::{
BoolType, ByteArrayType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type, Int96Type,
};
use crate::errors::Result;
Expand Down Expand Up @@ -217,18 +217,6 @@ fn build_primitive_reader(
Some(DataType::Dictionary(_, _)) => {
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)
}
Some(DataType::Decimal128(precision, scale)) => {
// read decimal data from parquet binary physical type
let convert = DecimalByteArrayConvert::new(DecimalArrayConverter::new(
precision, scale,
));
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
DecimalByteArrayConvert,
>::new(
page_iterator, column_desc, convert, arrow_type
)?))
}
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type),
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
Expand Down
23 changes: 20 additions & 3 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::buffer::ScalarValue;
Expand All @@ -29,11 +30,12 @@ use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
use arrow::array::{ArrayRef, OffsetSizeTrait};
use arrow::array::{Array, ArrayRef, BinaryArray, Decimal128Array, OffsetSizeTrait};
use arrow::buffer::Buffer;
use arrow::datatypes::DataType as ArrowType;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

/// Returns an [`ArrayReader`] that decodes the provided byte array column
pub fn make_byte_array_reader(
Expand All @@ -50,7 +52,7 @@ pub fn make_byte_array_reader(
};

match data_type {
ArrowType::Binary | ArrowType::Utf8 => {
ArrowType::Binary | ArrowType::Utf8 | ArrowType::Decimal128(_, _) => {
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
Expand Down Expand Up @@ -117,7 +119,22 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();

Ok(buffer.into_array(null_buffer, self.data_type.clone()))
let array = match self.data_type {
ArrowType::Decimal128(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let decimal = binary
.iter()
.map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal128Array>()
.with_precision_and_scale(p, s)?;

Arc::new(decimal)
}
_ => buffer.into_array(null_buffer, self.data_type.clone()),
};

Ok(array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/arrow/buffer/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
})
}

/// Performs big endian sign extension
pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
assert!(b.len() <= N, "Array too large, expected less than {}", N);
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) {
*d = *s;
}
result
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
29 changes: 6 additions & 23 deletions parquet/src/arrow/buffer/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::data_type::{ByteArray, FixedLenByteArray, Int96};
use crate::data_type::{FixedLenByteArray, Int96};
use arrow::array::{
Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
Expand All @@ -26,6 +26,10 @@ use std::sync::Arc;
use crate::errors::Result;
use std::marker::PhantomData;

use crate::arrow::buffer::bit_util::sign_extend_be;
#[cfg(test)]
use crate::data_type::ByteArray;

#[cfg(test)]
use arrow::array::{StringArray, StringBuilder};

Expand Down Expand Up @@ -93,31 +97,13 @@ impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array>
}
}

impl Converter<Vec<Option<ByteArray>>, Decimal128Array> for DecimalArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<Decimal128Array> {
let array = source
.into_iter()
.map(|array| array.map(|array| from_bytes_to_i128(array.data())))
.collect::<Decimal128Array>()
.with_precision_and_scale(self.precision, self.scale)?;

Ok(array)
}
}

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(result)
i128::from_be_bytes(sign_extend_be(b))
}

/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval,
Expand Down Expand Up @@ -238,9 +224,6 @@ pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter<
DecimalArrayConverter,
>;

pub type DecimalByteArrayConvert =
ArrayRefConverter<Vec<Option<ByteArray>>, Decimal128Array, DecimalArrayConverter>;

pub struct ArrayRefConverter<S, A, C> {
_source: PhantomData<S>,
_array: PhantomData<A>,
Expand Down

0 comments on commit 34216d5

Please sign in to comment.