Skip to content

Commit

Permalink
feat: support unnest in GROUP BY clause (apache#11469)
Browse files Browse the repository at this point in the history
* feat: support group by unnest

* pass slt

* refactor: mv process_group_by_unnest into try_process_unnest

* chore: add some documentation comments and tests

* Avoid cloning input

* use consistent field names

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
2 people authored and wiedld committed Jul 31, 2024
1 parent 2472d1e commit 530bb0b
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 3 deletions.
118 changes: 117 additions & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ use crate::utils::{
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{Column, UnnestOptions};
use datafusion_expr::expr::Alias;
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols,
};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
Expand Down Expand Up @@ -297,6 +299,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: LogicalPlan,
select_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
// Try process group by unnest
let input = self.try_process_aggregate_unnest(input)?;

let mut intermediate_plan = input;
let mut intermediate_select_exprs = select_exprs;
// Each expr in select_exprs can contains multiple unnest stage
Expand Down Expand Up @@ -354,6 +359,117 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.build()
}

fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> {
match input {
LogicalPlan::Aggregate(agg) => {
let agg_expr = agg.aggr_expr.clone();
let (new_input, new_group_by_exprs) =
self.try_process_group_by_unnest(agg)?;
LogicalPlanBuilder::from(new_input)
.aggregate(new_group_by_exprs, agg_expr)?
.build()
}
LogicalPlan::Filter(mut filter) => {
filter.input = Arc::new(
self.try_process_aggregate_unnest(unwrap_arc(filter.input))?,
);
Ok(LogicalPlan::Filter(filter))
}
_ => Ok(input),
}
}

/// Try converting Unnest(Expr) of group by to Unnest/Projection
/// Return the new input and group_by_exprs of Aggregate.
fn try_process_group_by_unnest(
&self,
agg: Aggregate,
) -> Result<(LogicalPlan, Vec<Expr>)> {
let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;

let Aggregate {
input,
group_expr,
aggr_expr,
..
} = agg;

// process unnest of group_by_exprs, and input of agg will be rewritten
// for example:
//
// ```
// Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { table: "tab" }), name: "array_col" }))]], aggr=[[]]
// TableScan: tab
// ```
//
// will be transformed into
//
// ```
// Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]]
// Unnest: lists[unnest(tab.array_col)] structs[]
// Projection: tab.array_col AS unnest(tab.array_col)
// TableScan: tab
// ```
let mut intermediate_plan = unwrap_arc(input);
let mut intermediate_select_exprs = group_expr;

loop {
let mut unnest_columns = vec![];
let mut inner_projection_exprs = vec![];

let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
.iter()
.map(|expr| {
transform_bottom_unnest(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

if unnest_columns.is_empty() {
break;
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);

let mut projection_exprs = match &aggr_expr_using_columns {
Some(exprs) => (*exprs).clone(),
None => {
let mut columns = HashSet::new();
for expr in &aggr_expr {
expr.apply(|expr| {
if let Expr::Column(c) = expr {
columns.insert(Expr::Column(c.clone()));
}
Ok(TreeNodeRecursion::Continue)
})
// As the closure always returns Ok, this "can't" error
.expect("Unexpected error");
}
aggr_expr_using_columns = Some(columns.clone());
columns
}
};
projection_exprs.extend(inner_projection_exprs);

intermediate_plan = LogicalPlanBuilder::from(intermediate_plan)
.project(projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.build()?;

intermediate_select_exprs = outer_projection_exprs;
}
}

Ok((intermediate_plan, intermediate_select_exprs))
}

fn plan_selection(
&self,
selection: Option<SQLExpr>,
Expand Down
134 changes: 132 additions & 2 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,6 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1
query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them.
select unnest(column1), unnest(column1) from unnest_table;

statement ok
drop table unnest_table;

## unnest list followed by unnest struct
query ???
Expand Down Expand Up @@ -557,3 +555,135 @@ physical_plan
06)----------UnnestExec
07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3]
08)--------------MemoryExec: partitions=1, partition_sizes=[1]

## group by unnest

### without agg exprs
query I
select unnest(column1) c1 from unnest_table group by c1 order by c1;
----
1
2
3
4
5
6
12

query II
select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, c2 order by c1, c2;
----
1 7
2 NULL
3 NULL
4 8
5 9
6 11
12 NULL
NULL 10
NULL 12
NULL 42
NULL NULL

query III
select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1
2 NULL 1
3 NULL 1
4 8 2
5 9 2
6 11 3
12 NULL NULL
NULL 10 2
NULL 12 3
NULL 42 NULL
NULL NULL NULL

### with agg exprs

query IIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1
2 NULL 1 1
3 NULL 1 1
4 8 2 1
5 9 2 1
6 11 3 1
12 NULL NULL 1
NULL 10 2 1
NULL 12 3 1
NULL 42 NULL 1
NULL NULL NULL 1

query IIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1
2 NULL 1 1
3 NULL 1 1
4 8 2 1
5 9 2 1
6 11 3 0
12 NULL NULL 0
NULL 10 2 1
NULL 12 3 0
NULL 42 NULL 0
NULL NULL NULL 0

query IIIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4), sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1 1
2 NULL 1 1 1
3 NULL 1 1 1
4 8 2 1 2
5 9 2 1 2
6 11 3 0 3
12 NULL NULL 0 NULL
NULL 10 2 1 2
NULL 12 3 0 3
NULL 42 NULL 0 NULL
NULL NULL NULL 0 NULL

query II
select unnest(column1), count(*) from unnest_table group by unnest(column1) order by unnest(column1) desc;
----
12 1
6 1
5 1
4 1
3 1
2 1
1 1

### group by recursive unnest list

query ?
select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2 order by c2;
----
[1]
[1, 1]
[2]
[3, 4]
[5]
[7, 8]
[, 6]
NULL

query ?I
select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table group by c2 order by c2;
----
[1] 1
[1, 1] 1
[2] 1
[3, 4] 1
[5] 1
[7, 8] 1
[, 6] 1
NULL 1

### TODO: group by unnest struct
query error DataFusion error: Error during planning: Projection references non\-aggregate values
select unnest(column1) c1 from nested_unnest_table group by c1.c0;

0 comments on commit 530bb0b

Please sign in to comment.