Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DISTINCT ON from Postgres #7981

Merged
merged 23 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
deff43f
Initial DISTINT ON implementation
gruuya Oct 30, 2023
553f87a
Merge branch 'main' into distinct-on-implementation
gruuya Oct 30, 2023
04ce375
Add a couple more tests
gruuya Oct 30, 2023
148fee2
Add comments in the replace_distinct_aggregate optimizer
gruuya Oct 30, 2023
26e9082
Run cargo fmt to fix CI
gruuya Oct 30, 2023
a5cff4e
Make DISTINCT ON planning more robust to support arbitrary selection …
gruuya Oct 31, 2023
525523a
Merge branch 'main' into distinct-on-impl
gruuya Oct 31, 2023
5f0a26b
Merge branch 'main' into distinct-on-impl
gruuya Nov 1, 2023
2730bbd
Add DISTINCT ON + join SLT
gruuya Nov 1, 2023
9b41907
Handle no DISTINCT ON expressions and extend the docs for the replace…
gruuya Nov 2, 2023
7c232c8
Merge branch 'main' into distinct-on-impl
gruuya Nov 2, 2023
57ffea9
Merge branch 'main' into distinct-on-impl
gruuya Nov 3, 2023
5fdbef4
Remove misleading DISTINCT ON SLT comment
gruuya Nov 3, 2023
5f0cf19
Merge branch 'main' into distinct-on-impl
gruuya Nov 4, 2023
2ee64cd
Add an EXPLAIN SLT for a basic DISTINCT ON query
gruuya Nov 4, 2023
2bbcd7b
Merge branch 'main' into distinct-on-impl
gruuya Nov 6, 2023
5d89700
Revise comment in CommonSubexprEliminate::try_optimize_aggregate
gruuya Nov 6, 2023
8467736
Merge branch 'main' into distinct-on-impl
gruuya Nov 10, 2023
8ebdc13
Implement qualified expression alias and extend test coverage
gruuya Nov 10, 2023
d2bc2b4
Update datafusion/proto/proto/datafusion.proto
gruuya Nov 10, 2023
e74e1f8
Accompanying generated changes to alias proto tag revision
gruuya Nov 10, 2023
a912cad
Merge branch 'main' into distinct-on-impl
gruuya Nov 12, 2023
9f37d8f
Remove obsolete comment
gruuya Nov 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::expr_rewriter::{
rewrite_sort_cols_by_aggs,
};
use crate::logical_plan::{
Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Aggregate, Analyze, CrossJoin, Distinct, DistinctOn, EmptyRelation, Explain, Filter,
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
Window,
};
Expand Down Expand Up @@ -551,16 +551,29 @@ impl LogicalPlanBuilder {
let left_plan: LogicalPlan = self.plan;
let right_plan: LogicalPlan = plan;

Ok(Self::from(LogicalPlan::Distinct(Distinct {
input: Arc::new(union(left_plan, right_plan)?),
})))
Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
union(left_plan, right_plan)?,
)))))
}

/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(self) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct {
input: Arc::new(self.plan),
})))
Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
self.plan,
)))))
}

/// Project first values of the specified expression list according to the provided
/// sorting expressions grouped by the `DISTINCT ON` clause expressions.
pub fn distinct_on(
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct::On(
DistinctOn::try_new(on_expr, select_expr, sort_expr, Arc::new(self.plan))?,
))))
}

/// Apply a join to `right` using explicitly specified columns and an
Expand Down
8 changes: 4 additions & 4 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, EmptyRelation, Explain,
Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, DistinctOn, EmptyRelation,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan,
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
};
pub use statement::{
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,
Expand Down
170 changes: 155 additions & 15 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
use crate::dml::CopyOptions;
use crate::expr::{Alias, Exists, InSubquery, Placeholder};
use crate::expr_rewriter::create_col_from_scalar_expr;
use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::logical_plan::{DmlStatement, Statement};
Expand Down Expand Up @@ -163,7 +163,8 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
Comment on lines +166 to +167
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add api-change label for this diff

LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
Expand Down Expand Up @@ -367,6 +368,16 @@ impl LogicalPlan {
LogicalPlan::Unnest(Unnest { column, .. }) => {
f(&Expr::Column(column.clone()))
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
})) => on_expr
.iter()
.chain(select_expr.iter())
.chain(sort_expr.clone().unwrap_or(vec![]).iter())
.try_for_each(f),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
Expand All @@ -377,7 +388,7 @@ impl LogicalPlan {
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Distinct(Distinct::All(_))
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
Expand Down Expand Up @@ -405,7 +416,9 @@ impl LogicalPlan {
LogicalPlan::Union(Union { inputs, .. }) => {
inputs.iter().map(|arc| arc.as_ref()).collect()
}
LogicalPlan::Distinct(Distinct { input }) => vec![input],
LogicalPlan::Distinct(
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::Dml(write) => vec![&write.input],
Expand Down Expand Up @@ -461,8 +474,11 @@ impl LogicalPlan {
Ok(Some(agg.group_expr.as_slice()[0].clone()))
}
}
LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
Ok(Some(select_expr[0].clone()))
}
LogicalPlan::Filter(Filter { input, .. })
| LogicalPlan::Distinct(Distinct { input, .. })
| LogicalPlan::Distinct(Distinct::All(input))
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
Expand Down Expand Up @@ -823,10 +839,29 @@ impl LogicalPlan {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
})),
LogicalPlan::Distinct(Distinct { .. }) => {
Ok(LogicalPlan::Distinct(Distinct {
input: Arc::new(inputs[0].clone()),
}))
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(_) => Distinct::All(Arc::new(inputs[0].clone())),
Distinct::On(DistinctOn {
on_expr,
select_expr,
..
}) => {
let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
let select_expr = expr.split_off(on_expr.len());
Distinct::On(DistinctOn::try_new(
expr,
select_expr,
if !sort_expr.is_empty() {
Some(sort_expr)
} else {
None
},
Arc::new(inputs[0].clone()),
)?)
}
};
Ok(LogicalPlan::Distinct(distinct))
}
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
Expand Down Expand Up @@ -1064,7 +1099,9 @@ impl LogicalPlan {
LogicalPlan::Subquery(_) => None,
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
LogicalPlan::Distinct(Distinct { input }) => input.max_rows(),
LogicalPlan::Distinct(
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
) => input.max_rows(),
LogicalPlan::Values(v) => Some(v.values.len()),
LogicalPlan::Unnest(_) => None,
LogicalPlan::Ddl(_)
Expand Down Expand Up @@ -1667,9 +1704,21 @@ impl LogicalPlan {
LogicalPlan::Statement(statement) => {
write!(f, "{}", statement.display())
}
LogicalPlan::Distinct(Distinct { .. }) => {
write!(f, "Distinct:")
}
LogicalPlan::Distinct(distinct) => match distinct {
Distinct::All(_) => write!(f, "Distinct:"),
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
}) => write!(
f,
"DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
expr_vec_fmt!(on_expr),
expr_vec_fmt!(select_expr),
if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
),
},
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
Expand Down Expand Up @@ -2132,9 +2181,100 @@ pub struct Limit {

/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Distinct {
pub enum Distinct {
/// Plain `DISTINCT` referencing all selection expressions
All(Arc<LogicalPlan>),
/// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
On(DistinctOn),
}

/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DistinctOn {
/// The `DISTINCT ON` clause expression list
pub on_expr: Vec<Expr>,
/// The selected projection expression list
pub select_expr: Vec<Expr>,
/// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the same list of exprs is included in both the select_list and sort_exprs I wonder if it would be less error prone to have references rather than keep three parallel lists:

    /// The selected projection expression list
    pub select_expr: Vec<Expr>,
    /// The sort expressions
    pub sort_expr: Vec<Expr>,
    /// the number of prefix columns from `sort_expr` that form the `ON` clause used for deduplicating

Copy link
Contributor Author

@gruuya gruuya Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, on_expr and sort_expr should have a (partial) overlap (select_expr and sort_expr are arbitrary and different in general).

Condensing on_expr and sort_expr into one is doable, though it seems to me it would introduce more complexity, and make the logic harder to grasp. For starters sort_expr is not an exact super-set of on_expr, but instead wraps them inside Expr::Sorts to capture asc and nulls_first, but it can also be omitted (unlike ON exprs). On the other hand, these two are parsed at different places: first the on_expr is parsed in select_to_plan, and then sort_expr is extracted in order_by.

So in order to keep them both in one vec we'd have to: a) add an additional field to DistinctOn to track the length of the ON expressions as you mention (e.g. on_expr_count), b) if there are no actual sort_expr provided keep the ON expressions in them, otherwise validate that the wrapped sorting expressions match the existing ON expressions and replace the vector with the sorting expressions, and then c) inside replace_distinct_aggregate deconstruct that. That would mean take on_expr_count first expressions from sort_expr, and unwrapping the underlying expressions from Expr::Sort in case sort expression length is > on_expr_count. In case sort expression length is equal to on_expr_count it could mean no ORDER BY was provided (no need for unwrapping), or the ORDER BY expressions match the ON expressions modulo the aforementioned sorting specifiers (asc and nulls_first).

This strikes me as less clear all in all, though if you'd prefer it I'll make that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your explanation makes sense. Maybe we can take some of this rationale (e.g. that sort_exprs are not a superset as they are wrapped by Expr::Sort) and put them into the doc comments. We can do this as a follow on PR

pub sort_expr: Option<Vec<Expr>>,
/// The logical plan that is being DISTINCT'd
pub input: Arc<LogicalPlan>,
/// The schema description of the DISTINCT ON output
pub schema: DFSchemaRef,
}

impl DistinctOn {
/// Create a new `DistinctOn` struct.
pub fn try_new(
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
input: Arc<LogicalPlan>,
) -> Result<Self> {
if on_expr.is_empty() {
return plan_err!("No `ON` expressions provided");
}

let on_expr = normalize_cols(on_expr, input.as_ref())?;

// Create fields with any qualifier stuffed in the name itself
Copy link
Contributor Author

@gruuya gruuya Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of a hack, but the reason I was forced to do it is so that the plan schema remains unchanged after replace_distinct_aggregate. In particular, in that optimizer I alias the final projected expressions with the selection expression display name, but that always results in a schema without qualifiers (with any original qualifier crammed into the field name).

I feel like this case should be handled by Projection::try_new_with_schema over at replace_distinct_aggregate, but it seems that after #7919 the custom schema doesn't propagate through the optimization chain.

UPDATE: I've opened a related issue for this: #8008

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of a hack, but the reason I was forced to do it is so that the plan schema remains unchanged after replace_distinct_aggregate.

Note #7997 may be related

let fields = exprlist_to_fields(&select_expr, &input)?
.iter()
.map(|f| {
DFField::new_unqualified(
&f.qualified_name(),
f.data_type().clone(),
f.is_nullable(),
)
})
.collect();
let schema =
DFSchema::new_with_metadata(fields, input.schema().metadata().clone())?;

let mut distinct_on = DistinctOn {
on_expr,
select_expr,
sort_expr: None,
input,
schema: Arc::new(schema),
};

if let Some(sort_expr) = sort_expr {
distinct_on = distinct_on.with_sort_expr(sort_expr)?;
}

Ok(distinct_on)
}

/// Try to update `self` with a new sort expressions.
///
/// Validates that the sort expressions are a super-set of the `ON` expressions.
pub fn with_sort_expr(mut self, sort_expr: Vec<Expr>) -> Result<Self> {
let sort_expr = normalize_cols(sort_expr, self.input.as_ref())?;

// Check that the left-most sort expressions are the same as the `ON` expressions.
let mut matched = true;
for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
match sort {
Expr::Sort(SortExpr { expr, .. }) => {
if on != &**expr {
matched = false;
break;
}
}
_ => return plan_err!("Not a sort expression: {sort}"),
}
}

if self.on_expr.len() > sort_expr.len() || !matched {
return plan_err!(
"SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
);
}

self.sort_expr = Some(sort_expr);
Ok(self)
}
}

/// Aggregates its input based on a set of grouping and aggregate
Expand Down
26 changes: 19 additions & 7 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,17 @@ impl CommonSubexprEliminate {
let rewritten = pop_expr(&mut rewritten)?;

if affected_id.is_empty() {
// Alias aggregation epxressions if they have changed
// TODO: This should have really been identified above and handled in the `else` branch
let aggr_exprs = new_aggr_expr
.iter()
.zip(aggr_expr.iter())
.map(|(new_expr, old_expr)| {
new_expr.clone().alias_if_changed(old_expr.display_name()?)
})
.collect::<Result<Vec<Expr>>>()?;
// Since group_epxr changes, schema changes also. Use try_new method.
Aggregate::try_new(Arc::new(new_input), new_group_expr, new_aggr_expr)
Aggregate::try_new(Arc::new(new_input), new_group_expr, aggr_exprs)
.map(LogicalPlan::Aggregate)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't yet understand why this wasn't identified/aliased automatically, but I found this part to be necessary in order to get DISTINCT ON with complex expressions to work.

Copy link
Contributor Author

@gruuya gruuya Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I understand what is going on:

  • in general the root aggregation expressions need not be rewritten/aliased above since they can only appear once (i.e. they're not common), so this is ok
  • however, some of their child expressions may get rewritten; an example is the query select distinct on (c1 + 1) c2 from test order by c1 + 1, c3, where the initial aggregation of the sole projected field gets rewritten from FIRST_VALUE(test.column2) ORDER BY [test.column1 + Int64(1) ASC NULLS LAST, test.column3 ASC NULLS LAST] to FIRST_VALUE(test.column2) ORDER BY [test.column1 + Int64(1)Int64(1)test.column1 AS test.column1 + Int64(1) ASC NULLS LAST, test.column3 ASC NULLS LAST] (i.e. c1 + 1 is a common expression of it and the ON expression so it gets rewritten)
  • consequently the new schema will be different from the original one

I think this explains the behavior I've encountered here, and that this behavior is not anomalous. The only question is whether there is a better way to align the new aggregation schema with the old one besides aliasing as above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @waynexia has some thoughts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I've now clarified this comment since it was misleading, i.e. the aliasing should really be done in build_recover_project_plan once qualified aliasing is enabled.

} else {
let mut agg_exprs = vec![];
Expand Down Expand Up @@ -367,7 +376,7 @@ impl OptimizerRule for CommonSubexprEliminate {
Ok(Some(build_recover_project_plan(
&original_schema,
optimized_plan,
)))
)?))
}
plan => Ok(plan),
}
Expand Down Expand Up @@ -458,16 +467,19 @@ fn build_common_expr_project_plan(
/// the "intermediate" projection plan built in [build_common_expr_project_plan].
///
/// This is for those plans who don't keep its own output schema like `Filter` or `Sort`.
fn build_recover_project_plan(schema: &DFSchema, input: LogicalPlan) -> LogicalPlan {
fn build_recover_project_plan(
schema: &DFSchema,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let col_exprs = schema
.fields()
.iter()
.map(|field| Expr::Column(field.qualified_column()))
.collect();
LogicalPlan::Projection(
Projection::try_new(col_exprs, Arc::new(input))
.expect("Cannot build projection plan from an invalid schema"),
)
Ok(LogicalPlan::Projection(Projection::try_new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

col_exprs,
Arc::new(input),
)?))
}

fn extract_expressions(
Expand Down
12 changes: 6 additions & 6 deletions datafusion/optimizer/src/eliminate_nested_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ impl OptimizerRule for EliminateNestedUnion {
schema: schema.clone(),
})))
}
LogicalPlan::Distinct(Distinct { input: plan }) => match plan.as_ref() {
LogicalPlan::Distinct(Distinct::All(plan)) => match plan.as_ref() {
LogicalPlan::Union(Union { inputs, schema }) => {
let inputs = inputs
.iter()
.map(extract_plan_from_distinct)
.flat_map(extract_plans_from_union)
.collect::<Vec<_>>();

Ok(Some(LogicalPlan::Distinct(Distinct {
input: Arc::new(LogicalPlan::Union(Union {
Ok(Some(LogicalPlan::Distinct(Distinct::All(Arc::new(
LogicalPlan::Union(Union {
inputs,
schema: schema.clone(),
})),
})))
}),
)))))
}
_ => Ok(None),
},
Expand Down Expand Up @@ -94,7 +94,7 @@ fn extract_plans_from_union(plan: &Arc<LogicalPlan>) -> Vec<Arc<LogicalPlan>> {

fn extract_plan_from_distinct(plan: &Arc<LogicalPlan>) -> &Arc<LogicalPlan> {
match plan.as_ref() {
LogicalPlan::Distinct(Distinct { input: plan }) => plan,
LogicalPlan::Distinct(Distinct::All(plan)) => plan,
_ => plan,
}
}
Expand Down
Loading