Skip to content

Commit

Permalink
Optimize Parquet RowGroup pruning, update StatisticsExtractor API (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored and findepi committed Jul 16, 2024
1 parent 28ea3a7 commit 3b81fce
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 195 deletions.
20 changes: 8 additions & 12 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
parquet::{RequestedStatistics, StatisticsConverter},
parquet::StatisticsConverter,
{FileScanConfig, ParquetExec},
};
use datafusion::datasource::TableProvider;
Expand Down Expand Up @@ -518,21 +518,17 @@ impl ParquetMetadataIndexBuilder {

// extract the parquet statistics from the file's footer
let metadata = reader.metadata();
let row_groups = metadata.row_groups();

// Extract the min/max values for each row group from the statistics
let row_counts = StatisticsConverter::row_counts(reader.metadata())?;
let value_column_mins = StatisticsConverter::try_new(
let converter = StatisticsConverter::try_new(
"value",
RequestedStatistics::Min,
reader.schema(),
)?
.extract(reader.metadata())?;
let value_column_maxes = StatisticsConverter::try_new(
"value",
RequestedStatistics::Max,
reader.schema(),
)?
.extract(reader.metadata())?;
reader.parquet_schema(),
)?;
let row_counts = StatisticsConverter::row_group_row_counts(row_groups.iter())?;
let value_column_mins = converter.row_group_mins(row_groups.iter())?;
let value_column_maxes = converter.row_group_maxes(row_groups.iter())?;

// In a real system you would have to handle nulls, which represent
// unknown statistics. All statistics are known in this example
Expand Down
35 changes: 9 additions & 26 deletions datafusion/core/benches/parquet_statistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use arrow_schema::{
Field, Schema,
};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use datafusion::datasource::physical_plan::parquet::{
RequestedStatistics, StatisticsConverter,
};
use datafusion::datasource::physical_plan::parquet::StatisticsConverter;
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter};
use parquet::file::properties::WriterProperties;
use std::sync::Arc;
Expand Down Expand Up @@ -159,41 +157,26 @@ fn criterion_benchmark(c: &mut Criterion) {
let file = file.reopen().unwrap();
let reader = ArrowReaderBuilder::try_new(file).unwrap();
let metadata = reader.metadata();
let row_groups = metadata.row_groups();

let mut group =
c.benchmark_group(format!("Extract statistics for {}", dtype.clone()));
group.bench_function(
BenchmarkId::new("extract_statistics", dtype.clone()),
|b| {
b.iter(|| {
let _ = StatisticsConverter::try_new(
"col",
RequestedStatistics::Min,
reader.schema(),
)
.unwrap()
.extract(metadata)
.unwrap();

let _ = StatisticsConverter::try_new(
"col",
RequestedStatistics::Max,
reader.schema(),
)
.unwrap()
.extract(reader.metadata())
.unwrap();

let _ = StatisticsConverter::try_new(
let converter = StatisticsConverter::try_new(
"col",
RequestedStatistics::NullCount,
reader.schema(),
reader.parquet_schema(),
)
.unwrap()
.extract(reader.metadata())
.unwrap();

let _ = StatisticsConverter::row_counts(reader.metadata()).unwrap();
let _ = converter.row_group_mins(row_groups.iter()).unwrap();
let _ = converter.row_group_maxes(row_groups.iter()).unwrap();
let _ = converter.row_group_null_counts(row_groups.iter()).unwrap();
let _ = StatisticsConverter::row_group_row_counts(row_groups.iter())
.unwrap();
})
},
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use crate::physical_plan::metrics::{
pub struct ParquetFileMetrics {
/// Number of times the predicate could not be evaluated
pub predicate_evaluation_errors: Count,
/// Number of row groups whose bloom filters were checked and matched
/// Number of row groups whose bloom filters were checked and matched (not pruned)
pub row_groups_matched_bloom_filter: Count,
/// Number of row groups pruned by bloom filters
pub row_groups_pruned_bloom_filter: Count,
/// Number of row groups whose statistics were checked and matched
/// Number of row groups whose statistics were checked and matched (not pruned)
pub row_groups_matched_statistics: Count,
/// Number of row groups pruned by statistics
pub row_groups_pruned_statistics: Count,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use statistics::{RequestedStatistics, StatisticsConverter};
pub use statistics::StatisticsConverter;
pub use writer::plan_to_parquet;

/// Execution plan for reading one or more Parquet files.
Expand Down
109 changes: 59 additions & 50 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,23 @@

use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::{Column, ScalarValue};
use datafusion_common::{Column, Result, ScalarValue};
use parquet::basic::Type;
use parquet::data_type::Decimal;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
bloom_filter::Sbbf,
file::metadata::RowGroupMetaData,
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::datasource::listing::FileRange;
use crate::datasource::physical_plan::parquet::statistics::{
max_statistics, min_statistics, parquet_column,
};
use crate::datasource::physical_plan::parquet::statistics::parquet_column;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::{ParquetAccessPlan, ParquetFileMetrics};
use super::{ParquetAccessPlan, ParquetFileMetrics, StatisticsConverter};

/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
///
Expand Down Expand Up @@ -113,32 +110,37 @@ impl RowGroupAccessPlanFilter {
metrics: &ParquetFileMetrics,
) {
assert_eq!(groups.len(), self.access_plan.len());
for (idx, metadata) in groups.iter().enumerate() {
if !self.access_plan.should_scan(idx) {
continue;
}
let pruning_stats = RowGroupPruningStatistics {
parquet_schema,
row_group_metadata: metadata,
arrow_schema,
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
// NB: false means don't scan row group
if !values[0] {
// Indexes of row groups still to scan
let row_group_indexes = self.access_plan.row_group_indexes();
let row_group_metadatas = row_group_indexes
.iter()
.map(|&i| &groups[i])
.collect::<Vec<_>>();

let pruning_stats = RowGroupPruningStatistics {
parquet_schema,
row_group_metadatas,
arrow_schema,
};

// try to prune the row groups in a single call
match predicate.prune(&pruning_stats) {
Ok(values) => {
// values[i] is false means the predicate could not be true for row group i
for (idx, &value) in row_group_indexes.iter().zip(values.iter()) {
if !value {
self.access_plan.skip(*idx);
metrics.row_groups_pruned_statistics.add(1);
self.access_plan.skip(idx);
continue;
} else {
metrics.row_groups_matched_statistics.add(1);
}
}
// stats filter array could not be built
// don't prune this row group
Err(e) => {
log::debug!("Error evaluating row group predicate values {e}");
metrics.predicate_evaluation_errors.add(1);
}
}
metrics.row_groups_matched_statistics.add(1);
// stats filter array could not be built, so we can't prune
Err(e) => {
log::debug!("Error evaluating row group predicate values {e}");
metrics.predicate_evaluation_errors.add(1);
}
}
}

Expand Down Expand Up @@ -337,49 +339,55 @@ impl PruningStatistics for BloomFilterStatistics {
}
}

/// Wraps [`RowGroupMetaData`] in a way that implements [`PruningStatistics`]
///
/// Note: This should be implemented for an array of [`RowGroupMetaData`] instead
/// of per row-group
/// Wraps a slice of [`RowGroupMetaData`] in a way that implements [`PruningStatistics`]
struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a SchemaDescriptor,
row_group_metadata: &'a RowGroupMetaData,
row_group_metadatas: Vec<&'a RowGroupMetaData>,
arrow_schema: &'a Schema,
}

impl<'a> RowGroupPruningStatistics<'a> {
/// Lookups up the parquet column by name
fn column(&self, name: &str) -> Option<(&ColumnChunkMetaData, &FieldRef)> {
let (idx, field) = parquet_column(self.parquet_schema, self.arrow_schema, name)?;
Some((self.row_group_metadata.column(idx), field))
/// Return an iterator over the row group metadata
fn metadata_iter(&'a self) -> impl Iterator<Item = &'a RowGroupMetaData> + 'a {
self.row_group_metadatas.iter().copied()
}

fn statistics_converter<'b>(
&'a self,
column: &'b Column,
) -> Result<StatisticsConverter<'a>> {
StatisticsConverter::try_new(&column.name, self.arrow_schema, self.parquet_schema)
}
}

impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let (column, field) = self.column(&column.name)?;
min_statistics(field.data_type(), std::iter::once(column.statistics())).ok()
self.statistics_converter(column)
.and_then(|c| c.row_group_mins(self.metadata_iter()))
.ok()
}

fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let (column, field) = self.column(&column.name)?;
max_statistics(field.data_type(), std::iter::once(column.statistics())).ok()
self.statistics_converter(column)
.and_then(|c| c.row_group_maxes(self.metadata_iter()))
.ok()
}

fn num_containers(&self) -> usize {
1
self.row_group_metadatas.len()
}

fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let (c, _) = self.column(&column.name)?;
let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count()));
scalar.to_array().ok()
self.statistics_converter(column)
.and_then(|c| c.row_group_null_counts(self.metadata_iter()))
.ok()
}

fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
let (c, _) = self.column(&column.name)?;
let scalar = ScalarValue::UInt64(Some(c.num_values() as u64));
scalar.to_array().ok()
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
// row counts are the same for all columns in a row group
StatisticsConverter::row_group_row_counts(self.metadata_iter())
.ok()
.map(|counts| Arc::new(counts) as ArrayRef)
}

fn contained(
Expand All @@ -406,6 +414,7 @@ mod tests {
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::basic::LogicalType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::{
basic::Type as PhysicalType, file::statistics::Statistics as ParquetStatistics,
schema::types::SchemaDescPtr,
Expand Down
Loading

0 comments on commit 3b81fce

Please sign in to comment.