Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decimals min max statistics #1621

Merged
merged 13 commits into from
Apr 29, 2022
275 changes: 274 additions & 1 deletion parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};

use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
Expand Down Expand Up @@ -1006,6 +1006,41 @@ impl<T: DataType> ColumnWriterImpl<T> {
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
}

match self.descr.converted_type() {
ConvertedType::UINT_8
| ConvertedType::UINT_16
| ConvertedType::UINT_32
| ConvertedType::UINT_64 => {
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
_ => {}
};
atefsawaed marked this conversation as resolved.
Show resolved Hide resolved

if let Some(LogicalType::Decimal { .. }) = self.descr.logical_type() {
match self.descr.physical_type() {
atefsawaed marked this conversation as resolved.
Show resolved Hide resolved
Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
return compare_greater_byte_array_decimals(
a.as_bytes(),
b.as_bytes(),
);
}
_ => {}
};
}

if self.descr.converted_type() == ConvertedType::DECIMAL {
match self.descr.physical_type() {
Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
return compare_greater_byte_array_decimals(
a.as_bytes(),
b.as_bytes(),
);
}
_ => {}
};
};

a > b
}
}
Expand Down Expand Up @@ -1049,6 +1084,54 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
}
}

/// Signed comparison of bytes arrays
fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
atefsawaed marked this conversation as resolved.
Show resolved Hide resolved
let a_length = a.len();
let b_length = b.len();

if a_length == 0 || b_length == 0 {
return a_length > 0;
}

let first_a: u8 = a[0];
let first_b: u8 = b[0];

// We can short circuit for different signed numbers or
// for equal length bytes arrays that have different first bytes.
// The equality requirement is necessary for sign extension cases.
// 0xFF10 should be equal to 0x10 (due to big endian sign extension).
if (0x80 & first_a) != (0x80 & first_b)
|| (a_length == b_length && first_a != first_b)
atefsawaed marked this conversation as resolved.
Show resolved Hide resolved
{
return (first_a as i8) > (first_b as i8);
}

// When the lengths are unequal and the numbers are of the same
// sign we need to do comparison by sign extending the shorter
// value first, and once we get to equal sized arrays, lexicographical
// unsigned comparison of everything but the first byte is sufficient.

let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };

if a_length != b_length {
let not_equal = if a_length > b_length {
let lead_length = a_length - b_length;
(&a[0..lead_length]).iter().any(|&x| x != extension)
} else {
let lead_length = b_length - a_length;
(&b[0..lead_length]).iter().any(|&x| x != extension)
};

if not_equal {
let negative_values: bool = (first_a as i8) < 0;
let a_longer: bool = a_length > b_length;
return if negative_values { !a_longer } else { a_longer };
}
}

(a[1..]) > (b[1..])
atefsawaed marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(test)]
mod tests {
use rand::distributions::uniform::SampleUniform;
Expand Down Expand Up @@ -1475,6 +1558,84 @@ mod tests {
}
}

#[test]
fn test_column_writer_check_byte_array_min_max() {
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let mut writer =
get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
writer
.write_batch(
&[
ByteArray::from(vec![
255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8,
172u8, 19u8, 35u8, 231u8, 90u8, 0u8, 0u8,
]),
ByteArray::from(vec![
255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8,
62u8, 146u8, 152u8, 177u8, 56u8, 0u8, 0u8,
]),
ByteArray::from(vec![
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
0u8, 0u8, 0u8,
]),
ByteArray::from(vec![
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8,
246u8, 44u8, 0u8, 0u8,
]),
],
None,
None,
)
.unwrap();
let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::ByteArray(stats) = stats {
assert_eq!(
stats.min(),
&ByteArray::from(vec![
255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8,
172u8, 19u8, 35u8, 231u8, 90u8, 0u8, 0u8,
])
);
assert_eq!(
stats.max(),
&ByteArray::from(vec![
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8,
246u8, 44u8, 0u8, 0u8,
])
);
} else {
panic!("expecting Statistics::ByteArray");
}
} else {
panic!("metadata missing statistics");
}
}

#[test]
fn test_column_writer_uint32_converted_type_min_max() {
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<
Int32Type,
>(page_writer, 0, 0, props);
writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
assert_eq!(stats.min(), &0,);
assert_eq!(stats.max(), &5,);
} else {
panic!("expecting Statistics::Int32");
}
} else {
panic!("metadata missing statistics");
}
}

#[test]
fn test_column_writer_precalculated_statistics() {
let page_writer = get_test_page_writer();
Expand Down Expand Up @@ -1887,6 +2048,28 @@ mod tests {
assert!(matches!(stats, Statistics::Double(_)));
}

#[test]
fn test_compare_greater_byte_array_decimals() {
assert!(!compare_greater_byte_array_decimals(&[], &[],),);
assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
assert!(!compare_greater_byte_array_decimals(
&[0u8, 1u8,],
&[1u8, 0u8,],
),);
assert!(!compare_greater_byte_array_decimals(
&[255u8, 35u8, 0u8, 0u8,],
&[0u8,],
),);
assert!(compare_greater_byte_array_decimals(
&[0u8,],
&[255u8, 35u8, 0u8, 0u8,],
),);
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down Expand Up @@ -2153,4 +2336,94 @@ mod tests {
panic!("metadata missing statistics");
}
}

/// Returns Decimals column writer.
fn get_test_decimals_column_writer<T: DataType>(
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<T> {
let descr = Arc::new(get_test_decimals_column_descr::<T>(
max_def_level,
max_rep_level,
));
let column_writer = get_column_writer(descr, props, page_writer);
get_typed_column_writer::<T>(column_writer)
}

/// Returns decimals column reader.
fn get_test_decimals_column_reader<T: DataType>(
page_reader: Box<dyn PageReader>,
max_def_level: i16,
max_rep_level: i16,
) -> ColumnReaderImpl<T> {
let descr = Arc::new(get_test_decimals_column_descr::<T>(
max_def_level,
max_rep_level,
));
let column_reader = get_column_reader(descr, page_reader);
get_typed_column_reader::<T>(column_reader)
}

/// Returns descriptor for Decimal type with primitive column.
fn get_test_decimals_column_descr<T: DataType>(
max_def_level: i16,
max_rep_level: i16,
) -> ColumnDescriptor {
let path = ColumnPath::from("col");
let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
.with_length(16)
.with_logical_type(Some(LogicalType::Decimal {
scale: 2,
precision: 3,
}))
.with_scale(2)
.with_precision(3)
.build()
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
}

/// Returns column writer for UINT32 Column provided as ConvertedType only
fn get_test_unsigned_int_given_as_converted_column_writer<T: DataType>(
atefsawaed marked this conversation as resolved.
Show resolved Hide resolved
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<T> {
let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
max_def_level,
max_rep_level,
));
let column_writer = get_column_writer(descr, props, page_writer);
get_typed_column_writer::<T>(column_writer)
}

/// Returns column reader for UINT32 Column provided as ConvertedType only
fn get_test_unsigned_int_given_as_converted_column_reader<T: DataType>(
page_reader: Box<dyn PageReader>,
max_def_level: i16,
max_rep_level: i16,
) -> ColumnReaderImpl<T> {
let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
max_def_level,
max_rep_level,
));
let column_reader = get_column_reader(descr, page_reader);
get_typed_column_reader::<T>(column_reader)
}

/// Returns column descriptor for UINT32 Column provided as ConvertedType only
fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
max_def_level: i16,
max_rep_level: i16,
) -> ColumnDescriptor {
let path = ColumnPath::from("col");
let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
.with_converted_type(ConvertedType::UINT_32)
.build()
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
}
}