Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it clear that StatisticsConverter can not panic #6187

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 45 additions & 57 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
match $data_type {
Some(DataType::Boolean) => {
DataType::Boolean => {
let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
let mut builder = BooleanBuilder::new();
for x in iterator {
Expand All @@ -770,7 +770,7 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::UInt8) => Ok(Arc::new(
DataType::UInt8 => Ok(Arc::new(
UInt8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
Expand All @@ -781,7 +781,7 @@ macro_rules! get_data_page_statistics {
.flatten()
)
)),
Some(DataType::UInt16) => Ok(Arc::new(
DataType::UInt16 => Ok(Arc::new(
UInt16Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
Expand All @@ -792,7 +792,7 @@ macro_rules! get_data_page_statistics {
.flatten()
)
)),
Some(DataType::UInt32) => Ok(Arc::new(
DataType::UInt32 => Ok(Arc::new(
UInt32Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
Expand All @@ -802,7 +802,7 @@ macro_rules! get_data_page_statistics {
})
.flatten()
))),
Some(DataType::UInt64) => Ok(Arc::new(
DataType::UInt64 => Ok(Arc::new(
UInt64Array::from_iter(
[<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator)
.map(|x| {
Expand All @@ -812,7 +812,7 @@ macro_rules! get_data_page_statistics {
})
.flatten()
))),
Some(DataType::Int8) => Ok(Arc::new(
DataType::Int8 => Ok(Arc::new(
Int8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
Expand All @@ -823,7 +823,7 @@ macro_rules! get_data_page_statistics {
.flatten()
)
)),
Some(DataType::Int16) => Ok(Arc::new(
DataType::Int16 => Ok(Arc::new(
Int16Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
Expand All @@ -834,9 +834,9 @@ macro_rules! get_data_page_statistics {
.flatten()
)
)),
Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Float16) => Ok(Arc::new(
DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Float16 => Ok(Arc::new(
Float16Array::from_iter(
[<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
.map(|x| {
Expand All @@ -847,11 +847,11 @@ macro_rules! get_data_page_statistics {
.flatten()
)
)),
Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Utf8) => {
DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Utf8 => {
let mut builder = StringBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
Expand All @@ -871,7 +871,7 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::LargeUtf8) => {
DataType::LargeUtf8 => {
let mut builder = LargeStringBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
Expand All @@ -891,10 +891,10 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::Dictionary(_, value_type)) => {
[<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator)
DataType::Dictionary(_, value_type) => {
[<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator)
},
Some(DataType::Timestamp(unit, timezone)) => {
DataType::Timestamp(unit, timezone) => {
let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
Ok(match unit {
TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
Expand All @@ -903,8 +903,8 @@ macro_rules! get_data_page_statistics {
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
})
},
Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Date64) => Ok(
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Date64 => Ok(
Arc::new(
Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
Expand All @@ -917,11 +917,11 @@ macro_rules! get_data_page_statistics {
)
)
),
Some(DataType::Decimal128(precision, scale)) => Ok(Arc::new(
DataType::Decimal128(precision, scale) => Ok(Arc::new(
Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
Some(DataType::Decimal256(precision, scale)) => Ok(Arc::new(
DataType::Decimal256(precision, scale) => Ok(Arc::new(
Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
Some(DataType::Time32(unit)) => {
DataType::Time32(unit) => {
Ok(match unit {
TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(),
Expand All @@ -935,7 +935,7 @@ macro_rules! get_data_page_statistics {
}
})
}
Some(DataType::Time64(unit)) => {
DataType::Time64(unit) => {
Ok(match unit {
TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(
[<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(),
Expand All @@ -949,7 +949,7 @@ macro_rules! get_data_page_statistics {
}
})
},
Some(DataType::FixedSizeBinary(size)) => {
DataType::FixedSizeBinary(size) => {
let mut builder = FixedSizeBinaryBuilder::new(*size);
let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
Expand All @@ -962,18 +962,13 @@ macro_rules! get_data_page_statistics {
if x.len() == *size as usize {
let _ = builder.append_value(x.data());
} else {
// log::debug!(
// "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
// size,
// x.len(),
// );
builder.append_null();
}
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::Utf8View) => {
DataType::Utf8View => {
let mut builder = StringViewBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
Expand All @@ -993,7 +988,7 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::BinaryView) => {
DataType::BinaryView => {
let mut builder = BinaryViewBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
Expand All @@ -1008,23 +1003,22 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::Null) |
Some(DataType::Duration(_)) |
Some(DataType::Interval(_)) |
Some(DataType::List(_)) |
Some(DataType::ListView(_)) |
Some(DataType::FixedSizeList(_, _)) |
Some(DataType::LargeList(_)) |
Some(DataType::LargeListView(_)) |
Some(DataType::Struct(_)) |
Some(DataType::Union(_, _)) |
Some(DataType::Map(_, _)) |
Some(DataType::RunEndEncoded(_, _)) => {
DataType::Null |
DataType::Duration(_) |
DataType::Interval(_) |
DataType::List(_) |
DataType::ListView(_) |
DataType::FixedSizeList(_, _) |
DataType::LargeList(_) |
DataType::LargeListView(_) |
DataType::Struct(_) |
DataType::Union(_, _) |
DataType::Map(_, _) |
DataType::RunEndEncoded(_, _) => {
let len = $iterator.count();
// don't know how to extract statistics, so return a null array
Ok(new_null_array($data_type.unwrap(), len))
Ok(new_null_array($data_type, len))
},
None => unimplemented!() // not sure how to handle this
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this PR is to remove this line

}
}
}
Expand Down Expand Up @@ -1052,10 +1046,7 @@ fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(

/// Extracts the min statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
pub(crate) fn min_page_statistics<'a, I>(
data_type: Option<&DataType>,
iterator: I,
) -> Result<ArrayRef>
pub(crate) fn min_page_statistics<'a, I>(data_type: &DataType, iterator: I) -> Result<ArrayRef>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a drive-by observation that this will generate a fair amount of code for every iterator, it might be worth just paying the cost of just forcing &[ParquetStatistics] or similar

Copy link
Contributor Author

@alamb alamb Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right that &[ParquetStatistics] is likely to be the most commonly used iterator, and this generic formulation may be simply over engineering

I harbor some idea that we will be able to use the iterator API to quickly extract statistics for many files at once into a single large array (e.g. by reading the data using ParquetMetaDataReader) so we can prune across 100s of files very fast.

However, I will fully admit that no such code exists yet (either in our InfluxDB code or as an example) 🤔

where
I: Iterator<Item = (usize, &'a Index)>,
{
Expand All @@ -1064,10 +1055,7 @@ where

/// Extracts the max statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
pub(crate) fn max_page_statistics<'a, I>(
data_type: Option<&DataType>,
iterator: I,
) -> Result<ArrayRef>
pub(crate) fn max_page_statistics<'a, I>(data_type: &DataType, iterator: I) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
Expand Down Expand Up @@ -1437,7 +1425,7 @@ impl<'a> StatisticsConverter<'a> {
(*num_data_pages, column_page_index_per_row_group_per_column)
});

min_page_statistics(Some(data_type), iter)
min_page_statistics(data_type, iter)
}

/// Extract the maximum values from Data Page statistics.
Expand Down Expand Up @@ -1468,7 +1456,7 @@ impl<'a> StatisticsConverter<'a> {
(*num_data_pages, column_page_index_per_row_group_per_column)
});

max_page_statistics(Some(data_type), iter)
max_page_statistics(data_type, iter)
}

/// Returns a [`UInt64Array`] with null counts for each data page.
Expand Down
Loading