diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index a5891e655a052..84b80c311245c 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -26,18 +26,20 @@ use crate::utils::{ resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest, }; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use datafusion_common::{Column, UnnestOptions}; use datafusion_expr::expr::Alias; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, }; +use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::utils::{ expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, }; use datafusion_expr::{ - Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, + Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, }; use sqlparser::ast::{ Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, @@ -297,6 +299,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { input: LogicalPlan, select_exprs: Vec, ) -> Result { + // Try process group by unnest + let input = self.try_process_aggregate_unnest(input)?; + let mut intermediate_plan = input; let mut intermediate_select_exprs = select_exprs; // Each expr in select_exprs can contains multiple unnest stage @@ -354,6 +359,117 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .build() } + fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result { + match input { + LogicalPlan::Aggregate(agg) => { + let agg_expr = agg.aggr_expr.clone(); + let (new_input, new_group_by_exprs) = + self.try_process_group_by_unnest(agg)?; + LogicalPlanBuilder::from(new_input) + .aggregate(new_group_by_exprs, agg_expr)? + .build() + } + LogicalPlan::Filter(mut filter) => { + filter.input = Arc::new( + self.try_process_aggregate_unnest(unwrap_arc(filter.input))?, + ); + Ok(LogicalPlan::Filter(filter)) + } + _ => Ok(input), + } + } + + /// Try converting Unnest(Expr) of group by to Unnest/Projection + /// Return the new input and group_by_exprs of Aggregate. + fn try_process_group_by_unnest( + &self, + agg: Aggregate, + ) -> Result<(LogicalPlan, Vec)> { + let mut aggr_expr_using_columns: Option> = None; + + let Aggregate { + input, + group_expr, + aggr_expr, + .. + } = agg; + + // process unnest of group_by_exprs, and input of agg will be rewritten + // for example: + // + // ``` + // Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { table: "tab" }), name: "array_col" }))]], aggr=[[]] + // TableScan: tab + // ``` + // + // will be transformed into + // + // ``` + // Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]] + // Unnest: lists[unnest(tab.array_col)] structs[] + // Projection: tab.array_col AS unnest(tab.array_col) + // TableScan: tab + // ``` + let mut intermediate_plan = unwrap_arc(input); + let mut intermediate_select_exprs = group_expr; + + loop { + let mut unnest_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + let outer_projection_exprs: Vec = intermediate_select_exprs + .iter() + .map(|expr| { + transform_bottom_unnest( + &intermediate_plan, + &mut unnest_columns, + &mut inner_projection_exprs, + expr, + ) + }) + .collect::>>()? + .into_iter() + .flatten() + .collect(); + + if unnest_columns.is_empty() { + break; + } else { + let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); + let unnest_options = UnnestOptions::new().with_preserve_nulls(false); + + let mut projection_exprs = match &aggr_expr_using_columns { + Some(exprs) => (*exprs).clone(), + None => { + let mut columns = HashSet::new(); + for expr in &aggr_expr { + expr.apply(|expr| { + if let Expr::Column(c) = expr { + columns.insert(Expr::Column(c.clone())); + } + Ok(TreeNodeRecursion::Continue) + }) + // As the closure always returns Ok, this "can't" error + .expect("Unexpected error"); + } + aggr_expr_using_columns = Some(columns.clone()); + columns + } + }; + projection_exprs.extend(inner_projection_exprs); + + intermediate_plan = LogicalPlanBuilder::from(intermediate_plan) + .project(projection_exprs)? + .unnest_columns_with_options(columns, unnest_options)? + .build()?; + + intermediate_select_exprs = outer_projection_exprs; + } + } + + Ok((intermediate_plan, intermediate_select_exprs)) + } + fn plan_selection( &self, selection: Option, diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 698faf87c9b20..93146541e107b 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -500,8 +500,6 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1 query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them. select unnest(column1), unnest(column1) from unnest_table; -statement ok -drop table unnest_table; ## unnest list followed by unnest struct query ??? @@ -557,3 +555,135 @@ physical_plan 06)----------UnnestExec 07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3] 08)--------------MemoryExec: partitions=1, partition_sizes=[1] + +## group by unnest + +### without agg exprs +query I +select unnest(column1) c1 from unnest_table group by c1 order by c1; +---- +1 +2 +3 +4 +5 +6 +12 + +query II +select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, c2 order by c1, c2; +---- +1 7 +2 NULL +3 NULL +4 8 +5 9 +6 11 +12 NULL +NULL 10 +NULL 12 +NULL 42 +NULL NULL + +query III +select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table group by c1, c2, c3 order by c1, c2, c3; +---- +1 7 1 +2 NULL 1 +3 NULL 1 +4 8 2 +5 9 2 +6 11 3 +12 NULL NULL +NULL 10 2 +NULL 12 3 +NULL 42 NULL +NULL NULL NULL + +### with agg exprs + +query IIII +select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from unnest_table group by c1, c2, c3 order by c1, c2, c3; +---- +1 7 1 1 +2 NULL 1 1 +3 NULL 1 1 +4 8 2 1 +5 9 2 1 +6 11 3 1 +12 NULL NULL 1 +NULL 10 2 1 +NULL 12 3 1 +NULL 42 NULL 1 +NULL NULL NULL 1 + +query IIII +select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from unnest_table group by c1, c2, c3 order by c1, c2, c3; +---- +1 7 1 1 +2 NULL 1 1 +3 NULL 1 1 +4 8 2 1 +5 9 2 1 +6 11 3 0 +12 NULL NULL 0 +NULL 10 2 1 +NULL 12 3 0 +NULL 42 NULL 0 +NULL NULL NULL 0 + +query IIIII +select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4), sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3; +---- +1 7 1 1 1 +2 NULL 1 1 1 +3 NULL 1 1 1 +4 8 2 1 2 +5 9 2 1 2 +6 11 3 0 3 +12 NULL NULL 0 NULL +NULL 10 2 1 2 +NULL 12 3 0 3 +NULL 42 NULL 0 NULL +NULL NULL NULL 0 NULL + +query II +select unnest(column1), count(*) from unnest_table group by unnest(column1) order by unnest(column1) desc; +---- +12 1 +6 1 +5 1 +4 1 +3 1 +2 1 +1 1 + +### group by recursive unnest list + +query ? +select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2 order by c2; +---- +[1] +[1, 1] +[2] +[3, 4] +[5] +[7, 8] +[, 6] +NULL + +query ?I +select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table group by c2 order by c2; +---- +[1] 1 +[1, 1] 1 +[2] 1 +[3, 4] 1 +[5] 1 +[7, 8] 1 +[, 6] 1 +NULL 1 + +### TODO: group by unnest struct +query error DataFusion error: Error during planning: Projection references non\-aggregate values +select unnest(column1) c1 from nested_unnest_table group by c1.c0;