From d7f2fbab66fe45343269dcb6ec23d4b615142815 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 27 Oct 2022 13:58:17 -0400 Subject: [PATCH 1/3] Log error building row filters Inspired by @liukun4515 at https://github.com/apache/arrow-datafusion/pull/3380/files#r970198755 --- .../src/physical_plan/file_format/parquet.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 0dda94322619..9770e7892af2 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -450,15 +450,26 @@ impl FileOpener for ParquetOpener { .then(|| pruning_predicate.as_ref().map(|p| p.logical_expr())) .flatten() { - if let Ok(Some(filter)) = build_row_filter( + let row_filter = build_row_filter( predicate.clone(), builder.schema().as_ref(), table_schema.as_ref(), builder.metadata(), reorder_predicates, - ) { - builder = builder.with_row_filter(filter); - } + ); + + match row_filter { + Ok(Some(filter)) => { + builder = builder.with_row_filter(filter); + } + Ok(None) => {} + Err(e) => { + debug!( + "Ignoring error building row filter for '{:?}': {}", + predicate, e + ); + } + }; }; let file_metadata = builder.metadata(); From b7171b4d2dcb8e24274fac552188c09e2cd2a2f2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 27 Oct 2022 14:24:33 -0400 Subject: [PATCH 2/3] Add parquet predicate pushdown metrics --- .../src/physical_plan/file_format/parquet.rs | 167 +++++++++++++++--- .../physical_plan/file_format/row_filter.rs | 49 ++++- .../core/src/physical_plan/metrics/mod.rs | 13 +- 3 files changed, 193 insertions(+), 36 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 9770e7892af2..f9ec72ab0067 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -237,6 +237,10 @@ pub struct ParquetFileMetrics { pub row_groups_pruned: metrics::Count, /// Total number of bytes scanned pub bytes_scanned: metrics::Count, + /// Total rows filtered out by predicates pushed into parquet scan + pub pushdown_rows_filtered: metrics::Count, + /// Total time spent evaluating pushdown filters + pub pushdown_eval_time: metrics::Time, } impl ParquetFileMetrics { @@ -258,10 +262,20 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .counter("bytes_scanned", partition); + let pushdown_rows_filtered = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("pushdown_rows_filtered", partition); + + let pushdown_eval_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("pushdown_eval_time", partition); + Self { predicate_evaluation_errors, row_groups_pruned, bytes_scanned, + pushdown_rows_filtered, + pushdown_eval_time, } } } @@ -410,7 +424,7 @@ impl FileOpener for ParquetOpener { ) -> Result { let file_range = file_meta.range.clone(); - let metrics = ParquetFileMetrics::new( + let file_metrics = ParquetFileMetrics::new( self.partition_index, file_meta.location().as_ref(), &self.metrics, @@ -456,6 +470,8 @@ impl FileOpener for ParquetOpener { table_schema.as_ref(), builder.metadata(), reorder_predicates, + &file_metrics.pushdown_rows_filtered, + &file_metrics.pushdown_eval_time, ); match row_filter { @@ -474,8 +490,12 @@ impl FileOpener for ParquetOpener { let file_metadata = builder.metadata(); let groups = file_metadata.row_groups(); - let row_groups = - prune_row_groups(groups, file_range, pruning_predicate.clone(), &metrics); + let row_groups = prune_row_groups( + groups, + file_range, + pruning_predicate.clone(), + &file_metrics, + ); if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) { let file_offset_indexes = file_metadata.offset_indexes(); @@ -491,7 +511,7 @@ impl FileOpener for ParquetOpener { pruning_predicate.clone(), file_offset_indexes.get(*r), file_page_indexes.get(*r), - &metrics, + &file_metrics, ) .map_err(|e| { ArrowError::ParquetError(format!( @@ -575,7 +595,7 @@ impl DefaultParquetFileReaderFactory { struct ParquetFileReader { store: Arc, meta: ObjectMeta, - metrics: ParquetFileMetrics, + file_metrics: ParquetFileMetrics, metadata_size_hint: Option, } @@ -584,7 +604,7 @@ impl AsyncFileReader for ParquetFileReader { &mut self, range: Range, ) -> BoxFuture<'_, parquet::errors::Result> { - self.metrics.bytes_scanned.add(range.end - range.start); + self.file_metrics.bytes_scanned.add(range.end - range.start); self.store .get_range(&self.meta.location, range) @@ -602,7 +622,7 @@ impl AsyncFileReader for ParquetFileReader { Self: Send, { let total = ranges.iter().map(|r| r.end - r.start).sum(); - self.metrics.bytes_scanned.add(total); + self.file_metrics.bytes_scanned.add(total); async move { self.store @@ -647,7 +667,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> { - let parquet_file_metrics = ParquetFileMetrics::new( + let file_metrics = ParquetFileMetrics::new( partition_index, file_meta.location().as_ref(), metrics, @@ -657,7 +677,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { meta: file_meta.object_meta, store: Arc::clone(&self.store), metadata_size_hint, - metrics: parquet_file_metrics, + file_metrics, })) } } @@ -1178,6 +1198,7 @@ mod tests { use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::options::CsvReadOptions; + use crate::physical_plan::metrics::MetricValue; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; use crate::{ @@ -1210,8 +1231,15 @@ mod tests { use std::io::Write; use tempfile::TempDir; - /// writes each RecordBatch as an individual parquet file and then - /// reads it back in to the named location. + struct RoundTripResult { + /// Data that was read back from ParquetFiles + batches: Result>, + /// The physical plan that was created (that has statistics, etc) + parquet_exec: Arc, + } + + /// writes each RecordBatch as an individual parquet file and re-reads + /// the data back. Returns the data as [RecordBatch]es async fn round_trip_to_parquet( batches: Vec, projection: Option>, @@ -1219,14 +1247,30 @@ mod tests { predicate: Option, pushdown_predicate: bool, ) -> Result> { + round_trip(batches, projection, schema, predicate, pushdown_predicate) + .await + .batches + } + + /// Writes each RecordBatch as an individual parquet file and then + /// reads them back. Returns the parquet exec as well as the data + /// as [RecordBatch]es + async fn round_trip( + batches: Vec, + projection: Option>, + schema: Option, + predicate: Option, + pushdown_predicate: bool, + ) -> RoundTripResult { let file_schema = match schema { Some(schema) => schema, - None => Arc::new(Schema::try_merge( - batches.iter().map(|b| b.schema().as_ref().clone()), - )?), + None => Arc::new( + Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())) + .unwrap(), + ), }; - let (meta, _files) = store_parquet(batches).await?; + let (meta, _files) = store_parquet(batches).await.unwrap(); let file_groups = meta.into_iter().map(Into::into).collect(); // prepare the scan @@ -1253,7 +1297,11 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - collect(Arc::new(parquet_exec), task_ctx).await + let parquet_exec = Arc::new(parquet_exec); + RoundTripResult { + batches: collect(parquet_exec.clone(), task_ctx).await, + parquet_exec, + } } // Add a new column with the specified field name to the RecordBatch @@ -1464,10 +1512,7 @@ mod tests { let filter = col("c2").eq(lit(2_i64)); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true) - .await - .unwrap(); + let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await; let expected = vec![ "+----+----+----+", "| c1 | c3 | c2 |", @@ -1475,7 +1520,10 @@ mod tests { "| | 20 | 2 |", "+----+----+----+", ]; - assert_batches_sorted_eq!(expected, &read); + assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); + let metrics = rt.parquet_exec.metrics().unwrap(); + // Note there are were 6 rows in total (across three batches) + assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5); } #[tokio::test] @@ -1598,7 +1646,7 @@ mod tests { } #[tokio::test] - async fn evolved_schema_disjoint_schema_filter_with_pushdown() { + async fn evolved_schema_disjoint_schema_with_filter_pushdown() { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1613,10 +1661,7 @@ mod tests { let filter = col("c2").eq(lit(1_i64)); // read/write them files: - let read = - round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true) - .await - .unwrap(); + let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await; let expected = vec![ "+----+----+", @@ -1625,7 +1670,10 @@ mod tests { "| | 1 |", "+----+----+", ]; - assert_batches_sorted_eq!(expected, &read); + assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); + let metrics = rt.parquet_exec.metrics().unwrap(); + // Note there are were 6 rows in total (across three batches) + assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5); } #[tokio::test] @@ -1906,6 +1954,71 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_exec_metrics() { + let c1: ArrayRef = Arc::new(StringArray::from(vec![ + Some("Foo"), + None, + Some("bar"), + Some("bar"), + Some("bar"), + Some("bar"), + Some("zzz"), + ])); + + // batch1: c1(string) + let batch1 = create_batch(vec![("c1", c1.clone())]); + + // on + let filter = col("c1").not_eq(lit("bar")); + + // read/write them files: + let rt = round_trip(vec![batch1], None, None, Some(filter), true).await; + + let metrics = rt.parquet_exec.metrics().unwrap(); + + // assert the batches and some metrics + let expected = vec![ + "+-----+", "| c1 |", "+-----+", "| Foo |", "| zzz |", "+-----+", + ]; + assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); + + // pushdown predicates have eliminated all 4 bar rows and the + // null row for 5 rows total + assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5); + assert!( + get_value(&metrics, "pushdown_eval_time") > 0, + "no eval time in metrics: {:#?}", + metrics + ); + } + + /// returns the sum of all the metrics with the specified name + /// the returned set. + /// + /// Count: returns value + /// Time: returns elapsed nanoseconds + /// + /// Panics if no such metric. + fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { + let sum = metrics.sum(|m| match m.value() { + MetricValue::Count { name, .. } if name == metric_name => true, + MetricValue::Time { name, .. } if name == metric_name => true, + _ => false, + }); + + match sum { + Some(MetricValue::Count { count, .. }) => count.value(), + Some(MetricValue::Time { time, .. }) => time.value(), + _ => { + panic!( + "Expected metric not found. Looking for '{}' in\n\n{:#?}", + metric_name, metrics + ); + } + } + } + fn parquet_file_metrics() -> ParquetFileMetrics { let metrics = Arc::new(ExecutionPlanMetricsSet::new()); ParquetFileMetrics::new(0, "file.parquet", &metrics) diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index dd9c8fb650fd..c8d6c5e2d196 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -31,6 +31,8 @@ use parquet::arrow::ProjectionMask; use parquet::file::metadata::ParquetMetaData; use std::sync::Arc; +use crate::physical_plan::metrics; + /// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which /// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`. /// DataFusion will use a `ParquetRecordBatchStream` to read data from parquet into arrow `RecordBatch`es. @@ -66,6 +68,10 @@ use std::sync::Arc; pub(crate) struct DatafusionArrowPredicate { physical_expr: Arc, projection: ProjectionMask, + /// how many rows were filtered out by this predicate + rows_filtered: metrics::Count, + /// how long was spent evaluating this predicate + time: metrics::Time, } impl DatafusionArrowPredicate { @@ -73,6 +79,8 @@ impl DatafusionArrowPredicate { candidate: FilterCandidate, schema: &Schema, metadata: &ParquetMetaData, + rows_filtered: metrics::Count, + time: metrics::Time, ) -> Result { let props = ExecutionProps::default(); @@ -88,6 +96,8 @@ impl DatafusionArrowPredicate { metadata.file_metadata().schema_descr(), candidate.projection, ), + rows_filtered, + time, }) } } @@ -98,6 +108,8 @@ impl ArrowPredicate for DatafusionArrowPredicate { } fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { + // scoped timer updates on drop + let mut timer = self.time.timer(); match self .physical_expr .evaluate(&batch) @@ -105,7 +117,13 @@ impl ArrowPredicate for DatafusionArrowPredicate { { Ok(array) => { if let Some(mask) = array.as_any().downcast_ref::() { - Ok(BooleanArray::from(mask.data().clone())) + let bool_arr = BooleanArray::from(mask.data().clone()); + // TODO is there a more efficient way to count the rows that are filtered? + let num_filtered = + bool_arr.iter().filter(|p| !matches!(p, Some(true))).count(); + self.rows_filtered.add(num_filtered); + timer.stop(); + Ok(bool_arr) } else { Err(ArrowError::ComputeError( "Unexpected result of predicate evaluation, expected BooleanArray".to_owned(), @@ -252,6 +270,8 @@ pub fn build_row_filter( table_schema: &Schema, metadata: &ParquetMetaData, reorder_predicates: bool, + rows_filtered: &metrics::Count, + time: &metrics::Time, ) -> Result> { let predicates = split_conjunction_owned(expr); @@ -280,15 +300,25 @@ pub fn build_row_filter( let mut filters: Vec> = vec![]; for candidate in indexed_candidates { - let filter = - DatafusionArrowPredicate::try_new(candidate, file_schema, metadata)?; + let filter = DatafusionArrowPredicate::try_new( + candidate, + file_schema, + metadata, + rows_filtered.clone(), + time.clone(), + )?; filters.push(Box::new(filter)); } for candidate in other_candidates { - let filter = - DatafusionArrowPredicate::try_new(candidate, file_schema, metadata)?; + let filter = DatafusionArrowPredicate::try_new( + candidate, + file_schema, + metadata, + rows_filtered.clone(), + time.clone(), + )?; filters.push(Box::new(filter)); } @@ -297,8 +327,13 @@ pub fn build_row_filter( } else { let mut filters: Vec> = vec![]; for candidate in candidates { - let filter = - DatafusionArrowPredicate::try_new(candidate, file_schema, metadata)?; + let filter = DatafusionArrowPredicate::try_new( + candidate, + file_schema, + metadata, + rows_filtered.clone(), + time.clone(), + )?; filters.push(Box::new(filter)); } diff --git a/datafusion/core/src/physical_plan/metrics/mod.rs b/datafusion/core/src/physical_plan/metrics/mod.rs index dbbb8af4f6fb..7d6d56c70bf4 100644 --- a/datafusion/core/src/physical_plan/metrics/mod.rs +++ b/datafusion/core/src/physical_plan/metrics/mod.rs @@ -166,8 +166,7 @@ impl Metric { } } -/// A snapshot of the metrics for a particular operator (`dyn -/// ExecutionPlan`). +/// A snapshot of the metrics for a particular ([`ExecutionPlan`]). #[derive(Default, Debug, Clone)] pub struct MetricsSet { metrics: Vec>, @@ -379,6 +378,16 @@ impl Label { let value = value.into(); Self { name, value } } + + /// Return the name of this label + pub fn name(&self) -> &str { + self.name.as_ref() + } + + /// Return the value of this label + pub fn value(&self) -> &str { + self.value.as_ref() + } } impl Display for Label { From 2b3f70cd09c7a016e80f7682cfbc3676f13278e0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Oct 2022 10:16:34 -0400 Subject: [PATCH 3/3] more efficient bit counting --- .../physical_plan/file_format/row_filter.rs | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index c8d6c5e2d196..49ec6b5caf3a 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -118,9 +118,7 @@ impl ArrowPredicate for DatafusionArrowPredicate { Ok(array) => { if let Some(mask) = array.as_any().downcast_ref::() { let bool_arr = BooleanArray::from(mask.data().clone()); - // TODO is there a more efficient way to count the rows that are filtered? - let num_filtered = - bool_arr.iter().filter(|p| !matches!(p, Some(true))).count(); + let num_filtered = bool_arr.len() - true_count(&bool_arr); self.rows_filtered.add(num_filtered); timer.stop(); Ok(bool_arr) @@ -138,6 +136,27 @@ impl ArrowPredicate for DatafusionArrowPredicate { } } +/// Return the number of non null true vaulues in an array +// TODO remove when https://github.com/apache/arrow-rs/issues/2963 is released +fn true_count(arr: &BooleanArray) -> usize { + match arr.data().null_buffer() { + Some(nulls) => { + let null_chunks = nulls.bit_chunks(arr.offset(), arr.len()); + let value_chunks = arr.values().bit_chunks(arr.offset(), arr.len()); + null_chunks + .iter() + .zip(value_chunks.iter()) + .chain(std::iter::once(( + null_chunks.remainder_bits(), + value_chunks.remainder_bits(), + ))) + .map(|(a, b)| (a & b).count_ones() as usize) + .sum() + } + None => arr.values().count_set_bits_offset(arr.offset(), arr.len()), + } +} + /// A candidate expression for creating a `RowFilter` contains the /// expression as well as data to estimate the cost of evaluating /// the resulting expression.