Skip to content

Commit

Permalink
Fix decimals min max statistics (#1621)
Browse files Browse the repository at this point in the history
* Fix incorrect writing of min/max statistics

* Refactor

* Decimals Byte array comparison

* Add Decimals test

* Use slice instead of vector

* Fix build error

* Fix build error

* Coding Style

* More tests

* Refactor

* Improve code readability

Co-authored-by: Atef Sawaed <atefsawaed@microsoft.com>
  • Loading branch information
atefsawaed and atefsaw authored Apr 29, 2022
1 parent 849ee2a commit d8d6499
Showing 1 changed file with 274 additions and 1 deletion.
275 changes: 274 additions & 1 deletion parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};

use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
Expand Down Expand Up @@ -1006,6 +1006,41 @@ impl<T: DataType> ColumnWriterImpl<T> {
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
}

match self.descr.converted_type() {
ConvertedType::UINT_8
| ConvertedType::UINT_16
| ConvertedType::UINT_32
| ConvertedType::UINT_64 => {
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
_ => {}
};

if let Some(LogicalType::Decimal { .. }) = self.descr.logical_type() {
match self.descr.physical_type() {
Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
return compare_greater_byte_array_decimals(
a.as_bytes(),
b.as_bytes(),
);
}
_ => {}
};
}

if self.descr.converted_type() == ConvertedType::DECIMAL {
match self.descr.physical_type() {
Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
return compare_greater_byte_array_decimals(
a.as_bytes(),
b.as_bytes(),
);
}
_ => {}
};
};

a > b
}
}
Expand Down Expand Up @@ -1049,6 +1084,54 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
}
}

/// Signed comparison of bytes arrays
fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
let a_length = a.len();
let b_length = b.len();

if a_length == 0 || b_length == 0 {
return a_length > 0;
}

let first_a: u8 = a[0];
let first_b: u8 = b[0];

// We can short circuit for different signed numbers or
// for equal length bytes arrays that have different first bytes.
// The equality requirement is necessary for sign extension cases.
// 0xFF10 should be equal to 0x10 (due to big endian sign extension).
if (0x80 & first_a) != (0x80 & first_b)
|| (a_length == b_length && first_a != first_b)
{
return (first_a as i8) > (first_b as i8);
}

// When the lengths are unequal and the numbers are of the same
// sign we need to do comparison by sign extending the shorter
// value first, and once we get to equal sized arrays, lexicographical
// unsigned comparison of everything but the first byte is sufficient.

let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };

if a_length != b_length {
let not_equal = if a_length > b_length {
let lead_length = a_length - b_length;
(&a[0..lead_length]).iter().any(|&x| x != extension)
} else {
let lead_length = b_length - a_length;
(&b[0..lead_length]).iter().any(|&x| x != extension)
};

if not_equal {
let negative_values: bool = (first_a as i8) < 0;
let a_longer: bool = a_length > b_length;
return if negative_values { !a_longer } else { a_longer };
}
}

(a[1..]) > (b[1..])
}

#[cfg(test)]
mod tests {
use rand::distributions::uniform::SampleUniform;
Expand Down Expand Up @@ -1475,6 +1558,84 @@ mod tests {
}
}

#[test]
fn test_column_writer_check_byte_array_min_max() {
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let mut writer =
get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
writer
.write_batch(
&[
ByteArray::from(vec![
255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8,
172u8, 19u8, 35u8, 231u8, 90u8, 0u8, 0u8,
]),
ByteArray::from(vec![
255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8,
62u8, 146u8, 152u8, 177u8, 56u8, 0u8, 0u8,
]),
ByteArray::from(vec![
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
0u8, 0u8, 0u8,
]),
ByteArray::from(vec![
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8,
246u8, 44u8, 0u8, 0u8,
]),
],
None,
None,
)
.unwrap();
let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::ByteArray(stats) = stats {
assert_eq!(
stats.min(),
&ByteArray::from(vec![
255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8,
172u8, 19u8, 35u8, 231u8, 90u8, 0u8, 0u8,
])
);
assert_eq!(
stats.max(),
&ByteArray::from(vec![
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8,
246u8, 44u8, 0u8, 0u8,
])
);
} else {
panic!("expecting Statistics::ByteArray");
}
} else {
panic!("metadata missing statistics");
}
}

#[test]
fn test_column_writer_uint32_converted_type_min_max() {
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<
Int32Type,
>(page_writer, 0, 0, props);
writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
let (_bytes_written, _rows_written, metadata) = writer.close().unwrap();
if let Some(stats) = metadata.statistics() {
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
assert_eq!(stats.min(), &0,);
assert_eq!(stats.max(), &5,);
} else {
panic!("expecting Statistics::Int32");
}
} else {
panic!("metadata missing statistics");
}
}

#[test]
fn test_column_writer_precalculated_statistics() {
let page_writer = get_test_page_writer();
Expand Down Expand Up @@ -1887,6 +2048,28 @@ mod tests {
assert!(matches!(stats, Statistics::Double(_)));
}

#[test]
fn test_compare_greater_byte_array_decimals() {
assert!(!compare_greater_byte_array_decimals(&[], &[],),);
assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
assert!(!compare_greater_byte_array_decimals(
&[0u8, 1u8,],
&[1u8, 0u8,],
),);
assert!(!compare_greater_byte_array_decimals(
&[255u8, 35u8, 0u8, 0u8,],
&[0u8,],
),);
assert!(compare_greater_byte_array_decimals(
&[0u8,],
&[255u8, 35u8, 0u8, 0u8,],
),);
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down Expand Up @@ -2153,4 +2336,94 @@ mod tests {
panic!("metadata missing statistics");
}
}

/// Returns Decimals column writer.
fn get_test_decimals_column_writer<T: DataType>(
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<T> {
let descr = Arc::new(get_test_decimals_column_descr::<T>(
max_def_level,
max_rep_level,
));
let column_writer = get_column_writer(descr, props, page_writer);
get_typed_column_writer::<T>(column_writer)
}

/// Returns decimals column reader.
fn get_test_decimals_column_reader<T: DataType>(
page_reader: Box<dyn PageReader>,
max_def_level: i16,
max_rep_level: i16,
) -> ColumnReaderImpl<T> {
let descr = Arc::new(get_test_decimals_column_descr::<T>(
max_def_level,
max_rep_level,
));
let column_reader = get_column_reader(descr, page_reader);
get_typed_column_reader::<T>(column_reader)
}

/// Returns descriptor for Decimal type with primitive column.
fn get_test_decimals_column_descr<T: DataType>(
max_def_level: i16,
max_rep_level: i16,
) -> ColumnDescriptor {
let path = ColumnPath::from("col");
let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
.with_length(16)
.with_logical_type(Some(LogicalType::Decimal {
scale: 2,
precision: 3,
}))
.with_scale(2)
.with_precision(3)
.build()
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
}

/// Returns column writer for UINT32 Column provided as ConvertedType only
fn get_test_unsigned_int_given_as_converted_column_writer<T: DataType>(
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<T> {
let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
max_def_level,
max_rep_level,
));
let column_writer = get_column_writer(descr, props, page_writer);
get_typed_column_writer::<T>(column_writer)
}

/// Returns column reader for UINT32 Column provided as ConvertedType only
fn get_test_unsigned_int_given_as_converted_column_reader<T: DataType>(
page_reader: Box<dyn PageReader>,
max_def_level: i16,
max_rep_level: i16,
) -> ColumnReaderImpl<T> {
let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
max_def_level,
max_rep_level,
));
let column_reader = get_column_reader(descr, page_reader);
get_typed_column_reader::<T>(column_reader)
}

/// Returns column descriptor for UINT32 Column provided as ConvertedType only
fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
max_def_level: i16,
max_rep_level: i16,
) -> ColumnDescriptor {
let path = ColumnPath::from("col");
let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
.with_converted_type(ConvertedType::UINT_32)
.build()
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
}
}

0 comments on commit d8d6499

Please sign in to comment.