Skip to content

Commit

Permalink
Fix: StatisticsConverter counts for missing columns (#10946)
Browse files Browse the repository at this point in the history
* feat: add run_with_schema + add test_case

* fix: null_counts

* fix: row_counts

* refactor: change return type of data_page_row_counts

* refactor: shorten row_group_indices
  • Loading branch information
marvinlanhenke authored Jun 17, 2024
1 parent e1cfb48 commit 1cb0057
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
self.statistics_converter(column)
.and_then(|c| c.row_group_null_counts(self.metadata_iter()))
.ok()
.map(|counts| Arc::new(counts) as ArrayRef)
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
Expand Down
43 changes: 21 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ where
/// of parquet page [`Index`]'es to an [`ArrayRef`]
///
/// The returned Array is an [`UInt64Array`]
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<ArrayRef>
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
where
I: Iterator<Item = (usize, &'a Index)>,
{
Expand All @@ -680,7 +680,7 @@ where
_ => unimplemented!(),
});

Ok(Arc::new(UInt64Array::from_iter(iter)))
Ok(UInt64Array::from_iter(iter))
}

/// Extracts Parquet statistics as Arrow arrays
Expand Down Expand Up @@ -874,21 +874,22 @@ impl<'a> StatisticsConverter<'a> {
/// Extract the null counts from row group statistics in [`RowGroupMetaData`]
///
/// See docs on [`Self::row_group_mins`] for details
pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<ArrayRef>
pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, metadatas));
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
));
};

let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.null_count()));
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
Ok(UInt64Array::from_iter(null_counts))
}

/// Extract the minimum values from Data Page statistics.
Expand Down Expand Up @@ -1007,14 +1008,15 @@ impl<'a> StatisticsConverter<'a> {
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
let num_row_groups = row_group_indices.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
Expand Down Expand Up @@ -1047,21 +1049,19 @@ impl<'a> StatisticsConverter<'a> {
pub fn data_page_row_counts<I>(
&self,
column_offset_index: &ParquetOffsetIndex,
row_group_metadatas: &[RowGroupMetaData],
row_group_metadatas: &'a [RowGroupMetaData],
row_group_indices: I,
) -> Result<ArrayRef>
) -> Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
// no matching column found in parquet_index;
// thus we cannot extract page_locations in order to determine
// the row count on a per DataPage basis.
return Ok(None);
};

// `offset_index[row_group_number][column_number][page_number]` holds
// the [`PageLocation`] corresponding to page `page_number` of column
// `column_number`of row group `row_group_number`.
let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index];
Expand All @@ -1070,9 +1070,8 @@ impl<'a> StatisticsConverter<'a> {
Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64)
});

let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();

// append the last page row count
let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
let row_count_per_page = row_count_per_page
.chain(std::iter::once(Some(
*num_rows_in_row_group as u64
Expand All @@ -1083,7 +1082,7 @@ impl<'a> StatisticsConverter<'a> {
row_count_total.extend(row_count_per_page);
}

Ok(Arc::new(UInt64Array::from_iter(row_count_total)))
Ok(Some(UInt64Array::from_iter(row_count_total)))
}

/// Returns a null array of data_type with one element per row group
Expand Down
Loading

0 comments on commit 1cb0057

Please sign in to comment.