From 8cffb68d0c4e7d3acd304096f326a157be20b81a Mon Sep 17 00:00:00 2001 From: Chunchun Ye <14298407+appletreeisyellow@users.noreply.github.com> Date: Wed, 22 May 2024 04:51:49 -0500 Subject: [PATCH] Stop copying LogicalPlan and Exprs in `SingleDistinctToGroupBy` (#10527) * chore: merge main and resolve conflict * chore: use less copy * chore: remove clone * remove more clones (#8) * refactor: use HashSet<&Expr> instead of HashSet * refactor: remove more cloning * chore: reduce string allocation Co-authored-by: Adam Curtis * chore: return internal error instead of panacing * chore: use arg display_name as hash key instead of a hashed value --------- Co-authored-by: Adam Curtis --- .../src/single_distinct_to_groupby.rs | 404 +++++++++--------- 1 file changed, 204 insertions(+), 200 deletions(-) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 4f9c1ad645e3..4b1f9a0d1401 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -22,7 +22,9 @@ use std::sync::Arc; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::{qualified_name, Result}; +use datafusion_common::{ + internal_err, qualified_name, tree_node::Transformed, DataFusionError, Result, +}; use datafusion_expr::builder::project; use datafusion_expr::expr::AggregateFunctionDefinition; use datafusion_expr::{ @@ -64,63 +66,55 @@ impl SingleDistinctToGroupBy { } /// Check whether all aggregate exprs are distinct on a single field. -fn is_single_distinct_agg(plan: &LogicalPlan) -> Result { - match plan { - LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => { - let mut fields_set = HashSet::new(); - let mut aggregate_count = 0; - for expr in aggr_expr { - if let Expr::AggregateFunction(AggregateFunction { - func_def: AggregateFunctionDefinition::BuiltIn(fun), - distinct, - args, - filter, - order_by, - null_treatment: _, - }) = expr - { - if filter.is_some() || order_by.is_some() { - return Ok(false); - } - aggregate_count += 1; - if *distinct { - for e in args { - fields_set.insert(e.canonical_name()); - } - } else if !matches!(fun, Sum | Min | Max) { - return Ok(false); - } - } else if let Expr::AggregateFunction(AggregateFunction { - func_def: AggregateFunctionDefinition::UDF(fun), - distinct, - args, - filter, - order_by, - null_treatment: _, - }) = expr - { - if filter.is_some() || order_by.is_some() { - return Ok(false); - } - aggregate_count += 1; - if *distinct { - for e in args { - fields_set.insert(e.canonical_name()); - } - } else if fun.name() != "SUM" - && fun.name() != "MIN" - && fun.name() != "MAX" - { - return Ok(false); - } - } else { - return Ok(false); +fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result { + let mut fields_set = HashSet::new(); + let mut aggregate_count = 0; + for expr in aggr_expr { + if let Expr::AggregateFunction(AggregateFunction { + func_def: AggregateFunctionDefinition::BuiltIn(fun), + distinct, + args, + filter, + order_by, + null_treatment: _, + }) = expr + { + if filter.is_some() || order_by.is_some() { + return Ok(false); + } + aggregate_count += 1; + if *distinct { + for e in args { + fields_set.insert(e); } + } else if !matches!(fun, Sum | Min | Max) { + return Ok(false); + } + } else if let Expr::AggregateFunction(AggregateFunction { + func_def: AggregateFunctionDefinition::UDF(fun), + distinct, + args, + filter, + order_by, + null_treatment: _, + }) = expr + { + if filter.is_some() || order_by.is_some() { + return Ok(false); } - Ok(aggregate_count == aggr_expr.len() && fields_set.len() == 1) + aggregate_count += 1; + if *distinct { + for e in args { + fields_set.insert(e); + } + } else if fun.name() != "SUM" && fun.name() != "MIN" && fun.name() != "MAX" { + return Ok(false); + } + } else { + return Ok(false); } - _ => Ok(false), } + Ok(aggregate_count == aggr_expr.len() && fields_set.len() == 1) } /// Check if the first expr is [Expr::GroupingSet]. @@ -131,9 +125,29 @@ fn contains_grouping_set(expr: &[Expr]) -> bool { impl OptimizerRule for SingleDistinctToGroupBy { fn try_optimize( &self, - plan: &LogicalPlan, + _plan: &LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { + internal_err!("Should have called SingleDistinctToGroupBy::rewrite") + } + + fn name(&self) -> &str { + "single_distinct_aggregation_to_group_by" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { match plan { LogicalPlan::Aggregate(Aggregate { input, @@ -141,167 +155,157 @@ impl OptimizerRule for SingleDistinctToGroupBy { schema, group_expr, .. - }) => { - if is_single_distinct_agg(plan)? && !contains_grouping_set(group_expr) { - // alias all original group_by exprs - let (mut inner_group_exprs, out_group_expr_with_alias): ( - Vec, - Vec<(Expr, Option)>, - ) = group_expr - .iter() - .enumerate() - .map(|(i, group_expr)| { - if let Expr::Column(_) = group_expr { - // For Column expressions we can use existing expression as is. - (group_expr.clone(), (group_expr.clone(), None)) - } else { - // For complex expression write is as alias, to be able to refer - // if from parent operators successfully. - // Consider plan below. - // - // Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ - // --Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ - // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] - // - // First aggregate(from bottom) refers to `test.a` column. - // Second aggregate refers to the `group_alias_0` column, Which is a valid field in the first aggregate. - // If we were to write plan above as below without alias - // - // Aggregate: groupBy=[[test.a + Int32(1)]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ - // --Aggregate: groupBy=[[test.a + Int32(1), test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ - // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] - // - // Second aggregate refers to the `test.a + Int32(1)` expression However, its input do not have `test.a` expression in it. - let alias_str = format!("group_alias_{i}"); - let alias_expr = group_expr.clone().alias(&alias_str); - let (qualifier, field) = schema.qualified_field(i); + }) if is_single_distinct_agg(&aggr_expr)? + && !contains_grouping_set(&group_expr) => + { + let group_size = group_expr.len(); + // alias all original group_by exprs + let (mut inner_group_exprs, out_group_expr_with_alias): ( + Vec, + Vec<(Expr, Option)>, + ) = group_expr + .into_iter() + .enumerate() + .map(|(i, group_expr)| { + if let Expr::Column(_) = group_expr { + // For Column expressions we can use existing expression as is. + (group_expr.clone(), (group_expr, None)) + } else { + // For complex expression write is as alias, to be able to refer + // if from parent operators successfully. + // Consider plan below. + // + // Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ + // --Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ + // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] + // + // First aggregate(from bottom) refers to `test.a` column. + // Second aggregate refers to the `group_alias_0` column, Which is a valid field in the first aggregate. + // If we were to write plan above as below without alias + // + // Aggregate: groupBy=[[test.a + Int32(1)]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ + // --Aggregate: groupBy=[[test.a + Int32(1), test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ + // ----TableScan: test [a:UInt32, b:UInt32, c:UInt32] + // + // Second aggregate refers to the `test.a + Int32(1)` expression However, its input do not have `test.a` expression in it. + let alias_str = format!("group_alias_{i}"); + let (qualifier, field) = schema.qualified_field(i); + ( + group_expr.alias(alias_str.clone()), ( - alias_expr, - ( - col(alias_str), - Some(qualified_name(qualifier, field.name())), - ), - ) - } - }) - .unzip(); - - // and they can be referenced by the alias in the outer aggr plan - let outer_group_exprs = out_group_expr_with_alias - .iter() - .map(|(out_group_expr, _)| out_group_expr.clone()) - .collect::>(); - - // replace the distinct arg with alias - let mut index = 1; - let mut group_fields_set = HashSet::new(); - let mut inner_aggr_exprs = vec![]; - let outer_aggr_exprs = aggr_expr - .iter() - .map(|aggr_expr| match aggr_expr { - Expr::AggregateFunction(AggregateFunction { - func_def: AggregateFunctionDefinition::BuiltIn(fun), - args, - distinct, - .. - }) => { - // is_single_distinct_agg ensure args.len=1 - if *distinct - && group_fields_set.insert(args[0].display_name()?) - { - inner_group_exprs.push( - args[0].clone().alias(SINGLE_DISTINCT_ALIAS), - ); + col(alias_str), + Some(qualified_name(qualifier, field.name())), + ), + ) + } + }) + .unzip(); + + // replace the distinct arg with alias + let mut index = 1; + let mut group_fields_set = HashSet::new(); + let mut inner_aggr_exprs = vec![]; + let outer_aggr_exprs = aggr_expr + .into_iter() + .map(|aggr_expr| match aggr_expr { + Expr::AggregateFunction(AggregateFunction { + func_def: AggregateFunctionDefinition::BuiltIn(fun), + mut args, + distinct, + .. + }) => { + if distinct { + if args.len() != 1 { + return internal_err!("DISTINCT aggregate should have exactly one argument"); } + let arg = args.swap_remove(0); + if group_fields_set.insert(arg.display_name()?) { + inner_group_exprs + .push(arg.alias(SINGLE_DISTINCT_ALIAS)); + } + Ok(Expr::AggregateFunction(AggregateFunction::new( + fun, + vec![col(SINGLE_DISTINCT_ALIAS)], + false, // intentional to remove distinct here + None, + None, + None, + ))) // if the aggregate function is not distinct, we need to rewrite it like two phase aggregation - if !(*distinct) { - index += 1; - let alias_str = format!("alias{}", index); - inner_aggr_exprs.push( - Expr::AggregateFunction(AggregateFunction::new( - fun.clone(), - args.clone(), - false, - None, - None, - None, - )) - .alias(&alias_str), - ); - Ok(Expr::AggregateFunction(AggregateFunction::new( + } else { + index += 1; + let alias_str = format!("alias{}", index); + inner_aggr_exprs.push( + Expr::AggregateFunction(AggregateFunction::new( fun.clone(), - vec![col(&alias_str)], + args, false, None, None, None, - ))) - } else { - Ok(Expr::AggregateFunction(AggregateFunction::new( - fun.clone(), - vec![col(SINGLE_DISTINCT_ALIAS)], - false, // intentional to remove distinct here - None, - None, - None, - ))) - } - } - _ => Ok(aggr_expr.clone()), - }) - .collect::>>()?; - - // construct the inner AggrPlan - let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new( - input.clone(), - inner_group_exprs, - inner_aggr_exprs, - )?); - - // so the aggregates are displayed in the same way even after the rewrite - // this optimizer has two kinds of alias: - // - group_by aggr - // - aggr expr - let group_size = group_expr.len(); - let alias_expr: Vec<_> = out_group_expr_with_alias - .into_iter() - .map(|(group_expr, original_field)| { - if let Some(name) = original_field { - group_expr.alias(name) - } else { - group_expr + )) + .alias(&alias_str), + ); + Ok(Expr::AggregateFunction(AggregateFunction::new( + fun, + vec![col(&alias_str)], + false, + None, + None, + None, + ))) } - }) - .chain(outer_aggr_exprs.iter().enumerate().map(|(idx, expr)| { + } + _ => Ok(aggr_expr), + }) + .collect::>>()?; + + // construct the inner AggrPlan + let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new( + input, + inner_group_exprs, + inner_aggr_exprs, + )?); + + let outer_group_exprs = out_group_expr_with_alias + .iter() + .map(|(expr, _)| expr.clone()) + .collect(); + + // so the aggregates are displayed in the same way even after the rewrite + // this optimizer has two kinds of alias: + // - group_by aggr + // - aggr expr + let alias_expr: Vec<_> = out_group_expr_with_alias + .into_iter() + .map(|(group_expr, original_field)| { + if let Some(name) = original_field { + group_expr.alias(name) + } else { + group_expr + } + }) + .chain(outer_aggr_exprs.iter().cloned().enumerate().map( + |(idx, expr)| { let idx = idx + group_size; let (qualifier, field) = schema.qualified_field(idx); let name = qualified_name(qualifier, field.name()); - expr.clone().alias(name) - })) - .collect(); - - let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new( - Arc::new(inner_agg), - outer_group_exprs, - outer_aggr_exprs, - )?); - Ok(Some(project(outer_aggr, alias_expr)?)) - } else { - Ok(None) - } + expr.alias(name) + }, + )) + .collect(); + + let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new( + Arc::new(inner_agg), + outer_group_exprs, + outer_aggr_exprs, + )?); + Ok(Transformed::yes(project(outer_aggr, alias_expr)?)) } - _ => Ok(None), + _ => Ok(Transformed::no(plan)), } } - - fn name(&self) -> &str { - "single_distinct_aggregation_to_group_by" - } - - fn apply_order(&self) -> Option { - Some(ApplyOrder::TopDown) - } } #[cfg(test)]