From a9444d83aed6afd33575ccebde5f0fbf5ba71e74 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 May 2024 16:07:36 -0400 Subject: [PATCH 1/4] Refactor parquet row group pruning into a struct --- datafusion/core/src/datasource/listing/mod.rs | 7 + .../datasource/physical_plan/parquet/mod.rs | 53 ++-- .../physical_plan/parquet/page_filter.rs | 13 +- .../physical_plan/parquet/row_groups.rs | 235 +++++++++++------- 4 files changed, 185 insertions(+), 123 deletions(-) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index d0361d7b32c1..04aec9d77d58 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -48,6 +48,13 @@ pub struct FileRange { pub end: i64, } +impl FileRange { + /// returns true if this file range contains the specified offset + pub fn contains(&self, offset: i64) -> bool { + offset >= self.start && offset < self.end + } +} + #[derive(Debug, Clone)] /// A single file or part of a file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index dd953878df49..902a284c230e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -70,6 +70,7 @@ mod row_groups; mod schema_adapter; mod statistics; +use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; pub use metrics::ParquetFileMetrics; pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; pub use statistics::{RequestedStatistics, StatisticsConverter}; @@ -556,32 +557,36 @@ impl FileOpener for ParquetOpener { }; }; - // Row group pruning by statistics: attempt to skip entire row_groups - // using metadata on the row groups + // Determine which row groups to actually read. The idea is to skip + // as many row groups as possible based on the metadata and query let file_metadata = builder.metadata().clone(); let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); - let mut row_groups = row_groups::prune_row_groups_by_statistics( - &file_schema, - builder.parquet_schema(), - file_metadata.row_groups(), - file_range, - predicate, - &file_metrics, - ); + let rg_metadata = file_metadata.row_groups(); + // track which row groups to actually read + let mut row_groups = RowGroupSet::new(rg_metadata.len()); + // if there is a range restricting what parts of the file to read + if let Some(range) = file_range.as_ref() { + row_groups.prune_by_range(rg_metadata, range); + } + // If there is a predicate that can be evaluated against the metadata + if let Some(predicate) = predicate.as_ref() { + row_groups.prune_by_statistics( + &file_schema, + builder.parquet_schema(), + rg_metadata, + predicate, + &file_metrics, + ); - // Bloom filter pruning: if bloom filters are enabled and then attempt to skip entire row_groups - // using bloom filters on the row groups - if enable_bloom_filter && !row_groups.is_empty() { - if let Some(predicate) = predicate { - row_groups = row_groups::prune_row_groups_by_bloom_filters( - &file_schema, - &mut builder, - &row_groups, - file_metadata.row_groups(), - predicate, - &file_metrics, - ) - .await; + if enable_bloom_filter && !row_groups.is_empty() { + row_groups + .prune_by_bloom_filters( + &file_schema, + &mut builder, + predicate, + &file_metrics, + ) + .await; } } @@ -610,7 +615,7 @@ impl FileOpener for ParquetOpener { let stream = builder .with_projection(mask) .with_batch_size(batch_size) - .with_row_groups(row_groups) + .with_row_groups(row_groups.indexes()) .build()?; let adapted = stream diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 402cc106492e..13e9bfb2a63d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -42,6 +42,7 @@ use std::collections::HashSet; use std::sync::Arc; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; +use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; use crate::datasource::physical_plan::parquet::statistics::{ from_bytes_to_i128, parquet_column, }; @@ -133,7 +134,7 @@ impl PagePruningPredicate { &self, arrow_schema: &Schema, parquet_schema: &SchemaDescriptor, - row_groups: &[usize], + row_groups: &RowGroupSet, file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, ) -> Result> { @@ -172,10 +173,10 @@ impl PagePruningPredicate { let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); let mut selectors = Vec::with_capacity(row_groups.len()); for r in row_groups.iter() { - let row_group_metadata = &groups[*r]; + let row_group_metadata = &groups[r]; - let rg_offset_indexes = file_offset_indexes.get(*r); - let rg_page_indexes = file_page_indexes.get(*r); + let rg_offset_indexes = file_offset_indexes.get(r); + let rg_page_indexes = file_page_indexes.get(r); if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = (rg_page_indexes, rg_offset_indexes, col_idx) { @@ -185,7 +186,7 @@ impl PagePruningPredicate { predicate, rg_offset_indexes.get(col_idx), rg_page_indexes.get(col_idx), - groups[*r].column(col_idx).column_descr(), + groups[r].column(col_idx).column_descr(), file_metrics, ) .map_err(|e| { @@ -201,7 +202,7 @@ impl PagePruningPredicate { ); // fallback select all rows let all_selected = - vec![RowSelector::select(groups[*r].num_rows() as usize)]; + vec![RowSelector::select(groups[r].num_rows() as usize)]; selectors.push(all_selected); } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index bcd9e1fa4479..d9f868b9bebb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -38,42 +38,98 @@ use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::ParquetFileMetrics; -/// Prune row groups based on statistics +/// Tracks which RowGroupsw within a parquet file should be scanned. /// -/// Returns a vector of indexes into `groups` which should be scanned. -/// -/// If an index is NOT present in the returned Vec it means the -/// predicate filtered all the row group. -/// -/// If an index IS present in the returned Vec it means the predicate -/// did not filter out that row group. -/// -/// Note: This method currently ignores ColumnOrder -/// -pub(crate) fn prune_row_groups_by_statistics( - arrow_schema: &Schema, - parquet_schema: &SchemaDescriptor, - groups: &[RowGroupMetaData], - range: Option, - predicate: Option<&PruningPredicate>, - metrics: &ParquetFileMetrics, -) -> Vec { - let mut filtered = Vec::with_capacity(groups.len()); - for (idx, metadata) in groups.iter().enumerate() { - if let Some(range) = &range { - // figure out where the first dictionary page (or first data page are) +/// This struct encapsulates the various types of pruning that can be applied to +/// a set of row groups within a parquet file. +#[derive(Debug)] +pub(crate) struct RowGroupSet { + /// row_groups[i] is true if the i-th row group should be scanned + row_groups: Vec, +} + +impl RowGroupSet { + /// Create a new RowGroupSet with all row groups set to true (will be scanned) + pub fn new(num_row_groups: usize) -> Self { + Self { + row_groups: vec![true; num_row_groups], + } + } + + /// Set the i-th row group to false (should not be scanned) + pub fn do_not_scan(&mut self, idx: usize) { + self.row_groups[idx] = false; + } + + /// return true if the i-th row group should be scanned + fn should_scan(&self, idx: usize) -> bool { + self.row_groups[idx] + } + + pub fn len(&self) -> usize { + self.row_groups.len() + } + + pub fn is_empty(&self) -> bool { + self.row_groups.is_empty() + } + + /// Return an iterator over the row group indexes that should be scanned + pub fn iter(&self) -> impl Iterator + '_ { + self.row_groups + .iter() + .enumerate() + .filter_map(|(idx, &b)| if b { Some(idx) } else { None }) + } + + /// Return a vector with the row group indexes that should be scanned + pub fn indexes(&self) -> Vec { + self.iter().collect() + } + + /// Prune remaining row groups so that only those row groups within the + /// specified range are scanned. + /// + /// Updates this set to mark row groups that should not be scanned + pub fn prune_by_range( + &mut self, + groups: &[RowGroupMetaData], + range: &FileRange, + ) { + for (idx, metadata) in groups.iter().enumerate() { + // Skip the row group if the first dictionary/data page are not + // within the range. + // // note don't use the location of metadata // let col = metadata.column(0); let offset = col .dictionary_page_offset() .unwrap_or_else(|| col.data_page_offset()); - if offset < range.start || offset >= range.end { - continue; + if !range.contains(offset) { + self.do_not_scan(idx); } } - - if let Some(predicate) = predicate { + } + /// Prune remaining row groups based using min/max/null_count statistics and + /// the [`PruningPredicate`]. + /// + /// Updates this set to mark row groups that should not be scanned + /// + /// Note: This method currently ignores ColumnOrder + /// + pub fn prune_by_statistics( + &mut self, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + groups: &[RowGroupMetaData], + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, + ) { + for (idx, metadata) in groups.iter().enumerate() { + if !self.should_scan(idx) { + continue; + } let pruning_stats = RowGroupPruningStatistics { parquet_schema, row_group_metadata: metadata, @@ -84,6 +140,7 @@ pub(crate) fn prune_row_groups_by_statistics( // NB: false means don't scan row group if !values[0] { metrics.row_groups_pruned_statistics.add(1); + self.do_not_scan(idx); continue; } } @@ -96,86 +153,77 @@ pub(crate) fn prune_row_groups_by_statistics( } metrics.row_groups_matched_statistics.add(1); } - - filtered.push(idx) } - filtered -} -/// Prune row groups by bloom filters -/// -/// Returns a vector of indexes into `groups` which should be scanned. -/// -/// If an index is NOT present in the returned Vec it means the -/// predicate filtered all the row group. -/// -/// If an index IS present in the returned Vec it means the predicate -/// did not filter out that row group. -pub(crate) async fn prune_row_groups_by_bloom_filters< - T: AsyncFileReader + Send + 'static, ->( - arrow_schema: &Schema, - builder: &mut ParquetRecordBatchStreamBuilder, - row_groups: &[usize], - groups: &[RowGroupMetaData], - predicate: &PruningPredicate, - metrics: &ParquetFileMetrics, -) -> Vec { - let mut filtered = Vec::with_capacity(groups.len()); - for idx in row_groups { - // get all columns in the predicate that we could use a bloom filter with - let literal_columns = predicate.literal_columns(); - let mut column_sbbf = HashMap::with_capacity(literal_columns.len()); - - for column_name in literal_columns { - let Some((column_idx, _field)) = - parquet_column(builder.parquet_schema(), arrow_schema, &column_name) - else { + /// Prune remaining row groups using any available bloom filters and the + /// [`PruningPredicate`] + /// + /// Updates this set with row groups that should not be scanned + pub async fn prune_by_bloom_filters( + &mut self, + arrow_schema: &Schema, + builder: &mut ParquetRecordBatchStreamBuilder, + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, + ) { + for idx in 0..self.len() { + // already filtered out + if !self.should_scan(idx) { continue; - }; + } - let bf = match builder - .get_row_group_column_bloom_filter(*idx, column_idx) - .await - { - Ok(Some(bf)) => bf, - Ok(None) => continue, // no bloom filter for this column - Err(e) => { - log::debug!("Ignoring error reading bloom filter: {e}"); - metrics.predicate_evaluation_errors.add(1); + // Attempt to find bloom filters for filtering this row group + let literal_columns = predicate.literal_columns(); + let mut column_sbbf = HashMap::with_capacity(literal_columns.len()); + + for column_name in literal_columns { + let Some((column_idx, _field)) = + parquet_column(builder.parquet_schema(), arrow_schema, &column_name) + else { continue; - } - }; - let physical_type = - builder.parquet_schema().column(column_idx).physical_type(); + }; + + let bf = match builder + .get_row_group_column_bloom_filter(idx, column_idx) + .await + { + Ok(Some(bf)) => bf, + Ok(None) => continue, // no bloom filter for this column + Err(e) => { + log::debug!("Ignoring error reading bloom filter: {e}"); + metrics.predicate_evaluation_errors.add(1); + continue; + } + }; + let physical_type = + builder.parquet_schema().column(column_idx).physical_type(); - column_sbbf.insert(column_name.to_string(), (bf, physical_type)); - } + column_sbbf.insert(column_name.to_string(), (bf, physical_type)); + } - let stats = BloomFilterStatistics { column_sbbf }; + let stats = BloomFilterStatistics { column_sbbf }; - // Can this group be pruned? - let prune_group = match predicate.prune(&stats) { - Ok(values) => !values[0], - Err(e) => { - log::debug!("Error evaluating row group predicate on bloom filter: {e}"); - metrics.predicate_evaluation_errors.add(1); - false - } - }; + // Can this group be pruned? + let prune_group = match predicate.prune(&stats) { + Ok(values) => !values[0], + Err(e) => { + log::debug!( + "Error evaluating row group predicate on bloom filter: {e}" + ); + metrics.predicate_evaluation_errors.add(1); + false + } + }; - if prune_group { - metrics.row_groups_pruned_bloom_filter.add(1); - } else { - if !stats.column_sbbf.is_empty() { + if prune_group { + metrics.row_groups_pruned_bloom_filter.add(1); + self.do_not_scan(idx) + } else if !stats.column_sbbf.is_empty() { metrics.row_groups_matched_bloom_filter.add(1); } - filtered.push(*idx); } } - filtered } - /// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF) struct BloomFilterStatistics { /// Maps column name to the parquet bloom filter and parquet physical type @@ -1297,6 +1345,7 @@ mod tests { } } + /// Evaluates the pruning predicate on the specified row groups and returns the row groups that are left async fn test_row_group_bloom_filter_pruning_predicate( file_name: &str, data: bytes::Bytes, From f6f958681bedd2a86b93947f17202a94589742d0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 21 May 2024 08:49:12 -0400 Subject: [PATCH 2/4] Port tests --- .../physical_plan/parquet/row_groups.rs | 341 +++++++++--------- 1 file changed, 179 insertions(+), 162 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index d9f868b9bebb..de32bb210b10 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -42,7 +42,7 @@ use super::ParquetFileMetrics; /// /// This struct encapsulates the various types of pruning that can be applied to /// a set of row groups within a parquet file. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub(crate) struct RowGroupSet { /// row_groups[i] is true if the i-th row group should be scanned row_groups: Vec, @@ -91,11 +91,7 @@ impl RowGroupSet { /// specified range are scanned. /// /// Updates this set to mark row groups that should not be scanned - pub fn prune_by_range( - &mut self, - groups: &[RowGroupMetaData], - range: &FileRange, - ) { + pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range: &FileRange) { for (idx, metadata) in groups.iter().enumerate() { // Skip the row group if the first dictionary/data page are not // within the range. @@ -487,17 +483,15 @@ mod tests { ); let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &[rgm1, rgm2], - None, - Some(&pruning_predicate), - &metrics - ), - vec![1] + let mut row_groups = RowGroupSet::new(2); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &[rgm1, rgm2], + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::Some(vec![1])) } #[test] @@ -523,17 +517,15 @@ mod tests { let metrics = parquet_file_metrics(); // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &[rgm1, rgm2], - None, - Some(&pruning_predicate), - &metrics - ), - vec![0, 1] + let mut row_groups = RowGroupSet::new(2); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &[rgm1, rgm2], + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::None); } #[test] @@ -572,17 +564,15 @@ mod tests { let groups = &[rgm1, rgm2]; // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - groups, - None, - Some(&pruning_predicate), - &metrics - ), - vec![1] + let mut row_groups = RowGroupSet::new(2); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + groups, + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::Some(vec![1])); // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out @@ -592,17 +582,15 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - groups, - None, - Some(&pruning_predicate), - &metrics - ), - vec![0, 1] + let mut row_groups = RowGroupSet::new(2); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + groups, + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::None); } #[test] @@ -650,17 +638,15 @@ mod tests { let groups = &[rgm1, rgm2]; // the first row group should be left because c1 is greater than zero // the second should be filtered out because c1 is less than zero - assert_eq!( - prune_row_groups_by_statistics( - &file_schema, // NB must be file schema, not table_schema - &schema_descr, - groups, - None, - Some(&pruning_predicate), - &metrics - ), - vec![0] + let mut row_groups = RowGroupSet::new(2); + row_groups.prune_by_statistics( + &file_schema, + &schema_descr, + groups, + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::Some(vec![0])); } fn gen_row_group_meta_data_for_pruning_predicate() -> Vec { @@ -701,17 +687,15 @@ mod tests { let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &groups, - None, - Some(&pruning_predicate), - &metrics - ), - vec![1] + let mut row_groups = RowGroupSet::new(2); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &groups, + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::Some(vec![1])); } #[test] @@ -735,17 +719,15 @@ mod tests { let metrics = parquet_file_metrics(); // bool = NULL always evaluates to NULL (and thus will not // pass predicates. Ideally these should both be false - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &groups, - None, - Some(&pruning_predicate), - &metrics - ), - vec![1] + let mut row_groups = RowGroupSet::new(groups.len()); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &groups, + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::Some(vec![1])); } #[test] @@ -797,18 +779,19 @@ mod tests { vec![ParquetStatistics::int32(Some(100), None, None, 0, false)], ); let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &[rgm1, rgm2, rgm3], - None, - Some(&pruning_predicate), - &metrics - ), - vec![0, 2] + let mut row_groups = RowGroupSet::new(3); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &[rgm1, rgm2, rgm3], + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::Some(vec![0, 2])); + } + #[test] + fn row_group_pruning_predicate_decimal_type2() { // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2). // We should convert all type to the coercion type, which is decimal(11,2) @@ -864,18 +847,18 @@ mod tests { vec![ParquetStatistics::int32(None, Some(2), None, 0, false)], ); let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &[rgm1, rgm2, rgm3, rgm4], - None, - Some(&pruning_predicate), - &metrics - ), - vec![0, 1, 3] + let mut row_groups = RowGroupSet::new(4); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &[rgm1, rgm2, rgm3, rgm4], + &pruning_predicate, + &metrics, ); - + assert_pruned(row_groups, ExpectedPruning::Some(vec![0, 1, 3])); + } + #[test] + fn row_group_pruning_predicate_decimal_type3() { // INT64: c1 < 5, the c1 is decimal(18,2) let schema = Arc::new(Schema::new(vec![Field::new( "c1", @@ -915,18 +898,18 @@ mod tests { vec![ParquetStatistics::int64(None, None, None, 0, false)], ); let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &[rgm1, rgm2, rgm3], - None, - Some(&pruning_predicate), - &metrics - ), - vec![1, 2] + let mut row_groups = RowGroupSet::new(3); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &[rgm1, rgm2, rgm3], + &pruning_predicate, + &metrics, ); - + assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2])); + } + #[test] + fn row_group_pruning_predicate_decimal_type4() { // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) // the type of parquet is decimal(18,2) let schema = Arc::new(Schema::new(vec![Field::new( @@ -989,18 +972,18 @@ mod tests { )], ); let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &[rgm1, rgm2, rgm3], - None, - Some(&pruning_predicate), - &metrics - ), - vec![1, 2] + let mut row_groups = RowGroupSet::new(3); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &[rgm1, rgm2, rgm3], + &pruning_predicate, + &metrics, ); - + assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2])); + } + #[test] + fn row_group_pruning_predicate_decimal_type5() { // BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) // the type of parquet is decimal(18,2) let schema = Arc::new(Schema::new(vec![Field::new( @@ -1052,17 +1035,15 @@ mod tests { vec![ParquetStatistics::byte_array(None, None, None, 0, false)], ); let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups_by_statistics( - &schema, - &schema_descr, - &[rgm1, rgm2, rgm3], - None, - Some(&pruning_predicate), - &metrics - ), - vec![1, 2] + let mut row_groups = RowGroupSet::new(3); + row_groups.prune_by_statistics( + &schema, + &schema_descr, + &[rgm1, rgm2, rgm3], + &pruning_predicate, + &metrics, ); + assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2])); } fn get_row_group_meta_data( @@ -1174,16 +1155,14 @@ mod tests { let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - let row_groups = vec![0]; let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( file_name, data, &pruning_predicate, - &row_groups, ) .await .unwrap(); - assert!(pruned_row_groups.is_empty()); + assert!(pruned_row_groups.indexes().is_empty()); } #[tokio::test] @@ -1244,14 +1223,60 @@ mod tests { .await } + // What row groups are expected to be left after pruning + #[derive(Debug)] + enum ExpectedPruning { + All, + /// Only the specified row groups are expected to REMAIN (not what is pruned) + Some(Vec), + None, + } + + impl ExpectedPruning { + /// asserts that the pruned row group match this expectation + fn assert(&self, row_groups: &RowGroupSet) { + let num_row_groups = row_groups.len(); + assert!(num_row_groups > 0); + let num_pruned = (0..num_row_groups) + .filter_map(|i| { + if row_groups.should_scan(i) { + None + } else { + Some(1) + } + }) + .sum::(); + + match self { + Self::All => { + assert_eq!( + num_row_groups, num_pruned, + "Expected all row groups to be pruned, but got {row_groups:?}" + ); + } + ExpectedPruning::None => { + assert_eq!( + num_pruned, 0, + "Expected no row groups to be pruned, but got {row_groups:?}" + ); + } + ExpectedPruning::Some(expected) => { + let actual = row_groups.indexes(); + assert_eq!(expected, &actual, "Unexpected row groups pruned. Expected {expected:?}, got {actual:?}"); + } + } + } + } + + fn assert_pruned(row_groups: RowGroupSet, expected: ExpectedPruning) { + expected.assert(&row_groups); + } + struct BloomFilterTest { file_name: String, schema: Schema, - // which row groups should be attempted to prune - row_groups: Vec, - // which row groups are expected to be left after pruning. Must be set - // otherwise will panic on run() - post_pruning_row_groups: Option>, + // which row groups are expected to be left after pruning + post_pruning_row_groups: ExpectedPruning, } impl BloomFilterTest { @@ -1282,8 +1307,7 @@ mod tests { Self { file_name: String::from("data_index_bloom_encoding_stats.parquet"), schema: Schema::new(vec![Field::new("String", DataType::Utf8, false)]), - row_groups: vec![0], - post_pruning_row_groups: None, + post_pruning_row_groups: ExpectedPruning::None, } } @@ -1296,20 +1320,19 @@ mod tests { DataType::Utf8, false, )]), - row_groups: vec![0], - post_pruning_row_groups: None, + post_pruning_row_groups: ExpectedPruning::None, } } /// Expect all row groups to be pruned pub fn with_expect_all_pruned(mut self) -> Self { - self.post_pruning_row_groups = Some(vec![]); + self.post_pruning_row_groups = ExpectedPruning::All; self } /// Expect all row groups not to be pruned pub fn with_expect_none_pruned(mut self) -> Self { - self.post_pruning_row_groups = Some(self.row_groups.clone()); + self.post_pruning_row_groups = ExpectedPruning::None; self } @@ -1318,13 +1341,9 @@ mod tests { let Self { file_name, schema, - row_groups, post_pruning_row_groups, } = self; - let post_pruning_row_groups = - post_pruning_row_groups.expect("post_pruning_row_groups must be set"); - let testdata = datafusion_common::test_util::parquet_test_data(); let path = format!("{testdata}/{file_name}"); let data = bytes::Bytes::from(std::fs::read(path).unwrap()); @@ -1337,11 +1356,11 @@ mod tests { &file_name, data, &pruning_predicate, - &row_groups, ) .await .unwrap(); - assert_eq!(pruned_row_groups, post_pruning_row_groups); + + post_pruning_row_groups.assert(&pruned_row_groups); } } @@ -1350,8 +1369,7 @@ mod tests { file_name: &str, data: bytes::Bytes, pruning_predicate: &PruningPredicate, - row_groups: &[usize], - ) -> Result> { + ) -> Result { use object_store::{ObjectMeta, ObjectStore}; let object_meta = ObjectMeta { @@ -1376,17 +1394,16 @@ mod tests { }; let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); - let metadata = builder.metadata().clone(); - let pruned_row_group = prune_row_groups_by_bloom_filters( - pruning_predicate.schema(), - &mut builder, - row_groups, - metadata.row_groups(), - pruning_predicate, - &file_metrics, - ) - .await; + let mut pruned_row_groups = RowGroupSet::new(builder.metadata().num_row_groups()); + pruned_row_groups + .prune_by_bloom_filters( + pruning_predicate.schema(), + &mut builder, + pruning_predicate, + &file_metrics, + ) + .await; - Ok(pruned_row_group) + Ok(pruned_row_groups) } } From 0805226569ef51d49a33395473520fb121f2f669 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 21 May 2024 15:00:10 -0400 Subject: [PATCH 3/4] improve docs --- .../physical_plan/parquet/row_groups.rs | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index de32bb210b10..a3fa27298176 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -38,18 +38,19 @@ use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::ParquetFileMetrics; -/// Tracks which RowGroupsw within a parquet file should be scanned. +/// Tracks which RowGroups within a parquet file should be scanned. /// /// This struct encapsulates the various types of pruning that can be applied to -/// a set of row groups within a parquet file. +/// a set of row groups within a parquet file, progressively narrowing down the +/// set of row groups that should be scanned. #[derive(Debug, PartialEq)] pub(crate) struct RowGroupSet { - /// row_groups[i] is true if the i-th row group should be scanned + /// `row_groups[i]` is true if the i-th row group should be scanned row_groups: Vec, } impl RowGroupSet { - /// Create a new RowGroupSet with all row groups set to true (will be scanned) + /// Create a new `RowGroupSet` with all row groups set to true (will be scanned) pub fn new(num_row_groups: usize) -> Self { Self { row_groups: vec![true; num_row_groups], @@ -61,15 +62,17 @@ impl RowGroupSet { self.row_groups[idx] = false; } - /// return true if the i-th row group should be scanned + /// Return true if the i-th row group should be scanned fn should_scan(&self, idx: usize) -> bool { self.row_groups[idx] } + /// Return the total number of row groups (not the total number to be scanned) pub fn len(&self) -> usize { self.row_groups.len() } + /// Return true if there are no row groups pub fn is_empty(&self) -> bool { self.row_groups.is_empty() } @@ -82,17 +85,20 @@ impl RowGroupSet { .filter_map(|(idx, &b)| if b { Some(idx) } else { None }) } - /// Return a vector with the row group indexes that should be scanned + /// Return a `Vec` of row group indices that should be scanned pub fn indexes(&self) -> Vec { self.iter().collect() } - /// Prune remaining row groups so that only those row groups within the - /// specified range are scanned. + /// Prune remaining row groups to only those within the specified range. /// /// Updates this set to mark row groups that should not be scanned pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range: &FileRange) { for (idx, metadata) in groups.iter().enumerate() { + if !self.should_scan(idx) { + continue; + } + // Skip the row group if the first dictionary/data page are not // within the range. // @@ -107,8 +113,8 @@ impl RowGroupSet { } } } - /// Prune remaining row groups based using min/max/null_count statistics and - /// the [`PruningPredicate`]. + /// Prune remaining row groups using min/max/null_count statistics and + /// the [`PruningPredicate`] to determine if the predicate can not be true. /// /// Updates this set to mark row groups that should not be scanned /// @@ -141,7 +147,7 @@ impl RowGroupSet { } } // stats filter array could not be built - // return a closure which will not filter out any row groups + // don't prune this row group Err(e) => { log::debug!("Error evaluating row group predicate values {e}"); metrics.predicate_evaluation_errors.add(1); @@ -151,8 +157,8 @@ impl RowGroupSet { } } - /// Prune remaining row groups using any available bloom filters and the - /// [`PruningPredicate`] + /// Prune remaining row groups using available bloom filters and the + /// [`PruningPredicate`]. /// /// Updates this set with row groups that should not be scanned pub async fn prune_by_bloom_filters( @@ -163,7 +169,6 @@ impl RowGroupSet { metrics: &ParquetFileMetrics, ) { for idx in 0..self.len() { - // already filtered out if !self.should_scan(idx) { continue; } @@ -220,7 +225,7 @@ impl RowGroupSet { } } } -/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF) +/// Implements [`PruningStatistics`] for Parquet Split Block Bloom Filters (SBBF) struct BloomFilterStatistics { /// Maps column name to the parquet bloom filter and parquet physical type column_sbbf: HashMap, From 4160326dbf6aeae591b138babd66eafad55ca83c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 21 May 2024 15:39:04 -0400 Subject: [PATCH 4/4] fix msrv --- .../core/src/datasource/physical_plan/parquet/page_filter.rs | 2 +- .../core/src/datasource/physical_plan/parquet/row_groups.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 13e9bfb2a63d..d47d5c56bdf9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -100,7 +100,7 @@ use super::metrics::ParquetFileMetrics; /// /// Using `A > 35`: can rule out all of values in Page 1 (rows 0 -> 199) /// -/// Using `B = 'F'`: can rule out all vaues in Page 3 and Page 5 (rows 0 -> 99, and 250 -> 299) +/// Using `B = 'F'`: can rule out all values in Page 3 and Page 5 (rows 0 -> 99, and 250 -> 299) /// /// So we can entirely skip rows 0->199 and 250->299 as we know they /// can not contain rows that match the predicate. diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index a3fa27298176..2da3cb30727d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -44,7 +44,7 @@ use super::ParquetFileMetrics; /// a set of row groups within a parquet file, progressively narrowing down the /// set of row groups that should be scanned. #[derive(Debug, PartialEq)] -pub(crate) struct RowGroupSet { +pub struct RowGroupSet { /// `row_groups[i]` is true if the i-th row group should be scanned row_groups: Vec, }