From 8c58b48074b909e3da464763de235c420ed3f0bf Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 2 Feb 2022 15:45:07 +0000 Subject: [PATCH] Prevent repartitioning of certain operator's direct children (#1731) --- .../src/physical_optimizer/repartition.rs | 187 +++++++++++------- datafusion/src/physical_plan/limit.rs | 4 + datafusion/src/physical_plan/mod.rs | 18 ++ datafusion/src/physical_plan/projection.rs | 4 + datafusion/src/physical_plan/union.rs | 4 + 5 files changed, 150 insertions(+), 67 deletions(-) diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 0926ed73ca9f..21b023efd576 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use super::optimizer::PhysicalOptimizerRule; +use crate::physical_plan::Partitioning::*; use crate::physical_plan::{ empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan, }; -use crate::physical_plan::{Distribution, Partitioning::*}; use crate::{error::Result, execution::context::ExecutionConfig}; /// Optimizer that introduces repartition to introduce more parallelism in the plan @@ -38,8 +38,8 @@ impl Repartition { fn optimize_partitions( target_partitions: usize, - requires_single_partition: bool, plan: Arc, + should_repartition: bool, ) -> Result> { // Recurse into children bottom-up (added nodes should be as deep as possible) @@ -47,17 +47,15 @@ fn optimize_partitions( // leaf node - don't replace children plan.clone() } else { + let should_repartition_children = plan.should_repartition_children(); let children = plan .children() .iter() .map(|child| { optimize_partitions( target_partitions, - matches!( - plan.required_child_distribution(), - Distribution::SinglePartition - ), child.clone(), + should_repartition_children, ) }) .collect::>()?; @@ -77,7 +75,7 @@ fn optimize_partitions( // But also not very useful to inlude let is_empty_exec = plan.as_any().downcast_ref::().is_some(); - if perform_repartition && !requires_single_partition && !is_empty_exec { + if perform_repartition && should_repartition && !is_empty_exec { Ok(Arc::new(RepartitionExec::try_new( new_plan, RoundRobinBatch(target_partitions), @@ -97,7 +95,7 @@ impl PhysicalOptimizerRule for Repartition { if config.target_partitions == 1 { Ok(plan) } else { - optimize_partitions(config.target_partitions, true, plan) + optimize_partitions(config.target_partitions, plan, false) } } @@ -107,93 +105,148 @@ impl PhysicalOptimizerRule for Repartition { } #[cfg(test)] mod tests { - use arrow::datatypes::Schema; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use super::*; use crate::datasource::PartitionedFile; + use crate::physical_plan::expressions::col; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; - use crate::physical_plan::projection::ProjectionExec; - use crate::physical_plan::Statistics; + use crate::physical_plan::filter::FilterExec; + use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; + use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; + use crate::physical_plan::{displayable, Statistics}; use crate::test::object_store::TestObjectStore; + fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)])) + } + + fn parquet_exec() -> Arc { + Arc::new(ParquetExec::new( + FileScanConfig { + object_store: TestObjectStore::new_arc(&[("x", 100)]), + file_schema: schema(), + file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + None, + )) + } + + fn filter_exec(input: Arc) -> Arc { + Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap()) + } + + fn hash_aggregate(input: Arc) -> Arc { + let schema = schema(); + Arc::new( + HashAggregateExec::try_new( + AggregateMode::Final, + vec![], + vec![], + Arc::new( + HashAggregateExec::try_new( + AggregateMode::Partial, + vec![], + vec![], + input, + schema.clone(), + ) + .unwrap(), + ), + schema, + ) + .unwrap(), + ) + } + + fn limit_exec(input: Arc) -> Arc { + Arc::new(GlobalLimitExec::new( + Arc::new(LocalLimitExec::new(input, 100)), + 100, + )) + } + + fn trim_plan_display(plan: &str) -> Vec<&str> { + plan.split('\n') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect() + } + #[test] fn added_repartition_to_single_partition() -> Result<()> { - let file_schema = Arc::new(Schema::empty()); - let parquet_project = ProjectionExec::try_new( - vec![], - Arc::new(ParquetExec::new( - FileScanConfig { - object_store: TestObjectStore::new_arc(&[("x", 100)]), - file_schema, - file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - None, - )), - )?; - let optimizer = Repartition {}; let optimized = optimizer.optimize( - Arc::new(parquet_project), + hash_aggregate(parquet_exec()), &ExecutionConfig::new().with_target_partitions(10), )?; - assert_eq!( - optimized.children()[0] - .output_partitioning() - .partition_count(), - 10 - ); + let plan = displayable(optimized.as_ref()).indent().to_string(); + + let expected = &[ + "HashAggregateExec: mode=Final, gby=[], aggr=[]", + "HashAggregateExec: mode=Partial, gby=[], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "ParquetExec: limit=None, partitions=[x]", + ]; + assert_eq!(&trim_plan_display(&plan), &expected); Ok(()) } #[test] fn repartition_deepest_node() -> Result<()> { - let file_schema = Arc::new(Schema::empty()); - let parquet_project = ProjectionExec::try_new( - vec![], - Arc::new(ProjectionExec::try_new( - vec![], - Arc::new(ParquetExec::new( - FileScanConfig { - object_store: TestObjectStore::new_arc(&[("x", 100)]), - file_schema, - file_groups: vec![vec![PartitionedFile::new( - "x".to_string(), - 100, - )]], - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - None, - )), - )?), + let optimizer = Repartition {}; + + let optimized = optimizer.optimize( + hash_aggregate(filter_exec(parquet_exec())), + &ExecutionConfig::new().with_target_partitions(10), )?; + let plan = displayable(optimized.as_ref()).indent().to_string(); + + let expected = &[ + "HashAggregateExec: mode=Final, gby=[], aggr=[]", + "HashAggregateExec: mode=Partial, gby=[], aggr=[]", + "FilterExec: c1@0", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_eq!(&trim_plan_display(&plan), &expected); + Ok(()) + } + + #[test] + fn repartition_ignores() -> Result<()> { let optimizer = Repartition {}; let optimized = optimizer.optimize( - Arc::new(parquet_project), + hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))), &ExecutionConfig::new().with_target_partitions(10), )?; - // RepartitionExec is added to deepest node - assert!(optimized.children()[0] - .as_any() - .downcast_ref::() - .is_none()); - assert!(optimized.children()[0].children()[0] - .as_any() - .downcast_ref::() - .is_some()); - + let plan = displayable(optimized.as_ref()).indent().to_string(); + + let expected = &[ + "HashAggregateExec: mode=Final, gby=[], aggr=[]", + "HashAggregateExec: mode=Partial, gby=[], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "GlobalLimitExec: limit=100", + "LocalLimitExec: limit=100", + "FilterExec: c1@0", + "RepartitionExec: partitioning=RoundRobinBatch(10)", + "GlobalLimitExec: limit=100", + "LocalLimitExec: limit=100", // Should not repartition for LocalLimitExec + "ParquetExec: limit=None, partitions=[x]", + ]; + + assert_eq!(&trim_plan_display(&plan), &expected); Ok(()) } } diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index f0225579d5a6..977aaa898d2a 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -300,6 +300,10 @@ impl ExecutionPlan for LocalLimitExec { _ => Statistics::default(), } } + + fn should_repartition_children(&self) -> bool { + false + } } /// Truncate a RecordBatch to maximum of n rows diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 725e475335ca..97da069ea405 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -135,14 +135,32 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// Returns the execution plan as [`Any`](std::any::Any) so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; + /// Get the schema for this execution plan fn schema(&self) -> SchemaRef; + /// Specifies the output partitioning scheme of this plan fn output_partitioning(&self) -> Partitioning; + /// Specifies the data distribution requirements of all the children for this operator fn required_child_distribution(&self) -> Distribution { Distribution::UnspecifiedDistribution } + + /// Returns `true` if the direct children of this `ExecutionPlan` should be repartitioned + /// to introduce greater concurrency to the plan + /// + /// The default implementation returns `true` unless `Self::request_child_distribution` + /// returns `Distribution::SinglePartition` + /// + /// Operators that do not benefit from additional partitioning may want to return `false` + fn should_repartition_children(&self) -> bool { + !matches!( + self.required_child_distribution(), + Distribution::SinglePartition + ) + } + /// Get a list of child execution plans that provide the input for this plan. The returned list /// will be empty for leaf nodes, will contain a single value for unary nodes, or two /// values for binary nodes (such as joins). diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index 4d0cc61c99e2..88348d872f09 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -185,6 +185,10 @@ impl ExecutionPlan for ProjectionExec { self.expr.iter().map(|(e, _)| Arc::clone(e)), ) } + + fn should_repartition_children(&self) -> bool { + false + } } /// If e is a direct column reference, returns the field level diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index 93ecf224b7b3..d2c170bc27f8 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -143,6 +143,10 @@ impl ExecutionPlan for UnionExec { .reduce(stats_union) .unwrap_or_default() } + + fn should_repartition_children(&self) -> bool { + false + } } /// Stream wrapper that records `BaselineMetrics` for a particular