Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove parquet dictionary converters (#1661) #1662

Merged
merged 1 commit into from
May 6, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 9 additions & 122 deletions parquet/src/arrow/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use crate::data_type::{ByteArray, DataType, FixedLenByteArray, Int96};
// TODO: clean up imports (best done when there are few moving parts)
use crate::data_type::{ByteArray, FixedLenByteArray, Int96};
use arrow::array::{
Array, ArrayRef, BinaryBuilder, FixedSizeBinaryBuilder,
IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
IntervalYearMonthBuilder, LargeBinaryBuilder, LargeStringBuilder, PrimitiveBuilder,
PrimitiveDictionaryBuilder, StringBuilder, StringDictionaryBuilder,
Array, ArrayRef, BinaryArray, BinaryBuilder, DecimalArray, FixedSizeBinaryArray,
FixedSizeBinaryBuilder, IntervalDayTimeArray, IntervalDayTimeBuilder,
IntervalYearMonthArray, IntervalYearMonthBuilder, LargeBinaryArray,
LargeBinaryBuilder, LargeStringArray, LargeStringBuilder, StringArray, StringBuilder,
TimestampNanosecondArray,
};
use arrow::compute::cast;
use std::convert::{From, TryInto};
use std::sync::Arc;

use crate::errors::Result;
use arrow::datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType};

use arrow::array::{
BinaryArray, DecimalArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray,
LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray,
};
use std::marker::PhantomData;

use crate::data_type::Int32Type as ParquetInt32Type;
use arrow::datatypes::Int32Type;

/// A converter is used to consume record reader's content and convert it to arrow
/// primitive array.
pub trait Converter<S, T> {
Expand Down Expand Up @@ -100,13 +90,11 @@ impl DecimalArrayConverter {

impl Converter<Vec<Option<FixedLenByteArray>>, DecimalArray> for DecimalArrayConverter {
fn convert(&self, source: Vec<Option<FixedLenByteArray>>) -> Result<DecimalArray> {
let array = source.into_iter()
let array = source
.into_iter()
.map(|array| array.map(|array| Self::from_bytes_to_i128(array.data())))
.collect::<DecimalArray>()
.with_precision_and_scale(
self.precision as usize,
self.scale as usize
)?;
.with_precision_and_scale(self.precision as usize, self.scale as usize)?;

Ok(array)
}
Expand Down Expand Up @@ -251,92 +239,6 @@ impl Converter<Vec<Option<ByteArray>>, LargeBinaryArray> for LargeBinaryArrayCon
}
}

pub struct StringDictionaryArrayConverter {}

impl<K: ArrowDictionaryKeyType> Converter<Vec<Option<ByteArray>>, DictionaryArray<K>>
for StringDictionaryArrayConverter
{
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<DictionaryArray<K>> {
let data_size = source
.iter()
.map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0))
.sum();

let keys_builder = PrimitiveBuilder::<K>::new(source.len());
let values_builder = StringBuilder::with_capacity(source.len(), data_size);

let mut builder = StringDictionaryBuilder::new(keys_builder, values_builder);
for v in source {
match v {
Some(array) => {
let _ = builder.append(array.as_utf8()?)?;
}
None => builder.append_null()?,
}
}

Ok(builder.finish())
}
}

pub struct DictionaryArrayConverter<DictValueSourceType, DictValueTargetType, ParquetType>
{
_dict_value_source_marker: PhantomData<DictValueSourceType>,
_dict_value_target_marker: PhantomData<DictValueTargetType>,
_parquet_marker: PhantomData<ParquetType>,
}

impl<DictValueSourceType, DictValueTargetType, ParquetType>
DictionaryArrayConverter<DictValueSourceType, DictValueTargetType, ParquetType>
{
pub fn new() -> Self {
Self {
_dict_value_source_marker: PhantomData,
_dict_value_target_marker: PhantomData,
_parquet_marker: PhantomData,
}
}
}

impl<K, DictValueSourceType, DictValueTargetType, ParquetType>
Converter<Vec<Option<<ParquetType as DataType>::T>>, DictionaryArray<K>>
for DictionaryArrayConverter<DictValueSourceType, DictValueTargetType, ParquetType>
where
K: ArrowPrimitiveType,
DictValueSourceType: ArrowPrimitiveType,
DictValueTargetType: ArrowPrimitiveType,
ParquetType: DataType,
PrimitiveArray<DictValueSourceType>: From<Vec<Option<<ParquetType as DataType>::T>>>,
{
fn convert(
&self,
source: Vec<Option<<ParquetType as DataType>::T>>,
) -> Result<DictionaryArray<K>> {
let keys_builder = PrimitiveBuilder::<K>::new(source.len());
let values_builder = PrimitiveBuilder::<DictValueTargetType>::new(source.len());

let mut builder = PrimitiveDictionaryBuilder::new(keys_builder, values_builder);

let source_array: Arc<dyn Array> =
Arc::new(PrimitiveArray::<DictValueSourceType>::from(source));
let target_array = cast(&source_array, &DictValueTargetType::DATA_TYPE)?;
let target = target_array
.as_any()
.downcast_ref::<PrimitiveArray<DictValueTargetType>>()
.unwrap();

for i in 0..target.len() {
if target.is_null(i) {
builder.append_null()?;
} else {
let _ = builder.append(target.value(i))?;
}
}

Ok(builder.finish())
}
}

pub type Utf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;
pub type LargeUtf8Converter =
Expand All @@ -348,21 +250,6 @@ pub type LargeBinaryConverter = ArrayRefConverter<
LargeBinaryArray,
LargeBinaryArrayConverter,
>;
pub type StringDictionaryConverter<T> = ArrayRefConverter<
Vec<Option<ByteArray>>,
DictionaryArray<T>,
StringDictionaryArrayConverter,
>;
pub type DictionaryConverter<K, SV, TV, P> = ArrayRefConverter<
Vec<Option<<P as DataType>::T>>,
DictionaryArray<K>,
DictionaryArrayConverter<SV, TV, P>,
>;
pub type PrimitiveDictionaryConverter<K, V> = ArrayRefConverter<
Vec<Option<<ParquetInt32Type as DataType>::T>>,
DictionaryArray<K>,
DictionaryArrayConverter<Int32Type, V, ParquetInt32Type>,
>;

pub type Int96Converter =
ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray, Int96ArrayConverter>;
Expand Down