Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DISTINCT ON from Postgres #7981

Merged
merged 23 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
deff43f
Initial DISTINT ON implementation
gruuya Oct 30, 2023
553f87a
Merge branch 'main' into distinct-on-implementation
gruuya Oct 30, 2023
04ce375
Add a couple more tests
gruuya Oct 30, 2023
148fee2
Add comments in the replace_distinct_aggregate optimizer
gruuya Oct 30, 2023
26e9082
Run cargo fmt to fix CI
gruuya Oct 30, 2023
a5cff4e
Make DISTINCT ON planning more robust to support arbitrary selection …
gruuya Oct 31, 2023
525523a
Merge branch 'main' into distinct-on-impl
gruuya Oct 31, 2023
5f0a26b
Merge branch 'main' into distinct-on-impl
gruuya Nov 1, 2023
2730bbd
Add DISTINCT ON + join SLT
gruuya Nov 1, 2023
9b41907
Handle no DISTINCT ON expressions and extend the docs for the replace…
gruuya Nov 2, 2023
7c232c8
Merge branch 'main' into distinct-on-impl
gruuya Nov 2, 2023
57ffea9
Merge branch 'main' into distinct-on-impl
gruuya Nov 3, 2023
5fdbef4
Remove misleading DISTINCT ON SLT comment
gruuya Nov 3, 2023
5f0cf19
Merge branch 'main' into distinct-on-impl
gruuya Nov 4, 2023
2ee64cd
Add an EXPLAIN SLT for a basic DISTINCT ON query
gruuya Nov 4, 2023
2bbcd7b
Merge branch 'main' into distinct-on-impl
gruuya Nov 6, 2023
5d89700
Revise comment in CommonSubexprEliminate::try_optimize_aggregate
gruuya Nov 6, 2023
8467736
Merge branch 'main' into distinct-on-impl
gruuya Nov 10, 2023
8ebdc13
Implement qualified expression alias and extend test coverage
gruuya Nov 10, 2023
d2bc2b4
Update datafusion/proto/proto/datafusion.proto
gruuya Nov 10, 2023
e74e1f8
Accompanying generated changes to alias proto tag revision
gruuya Nov 10, 2023
a912cad
Merge branch 'main' into distinct-on-impl
gruuya Nov 12, 2023
9f37d8f
Remove obsolete comment
gruuya Nov 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::Operator;
use crate::{aggregate_function, ExprSchemable};
use arrow::datatypes::DataType;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{internal_err, DFSchema};
use datafusion_common::{internal_err, DFSchema, OwnedTableReference};
use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue};
use std::collections::HashSet;
use std::fmt;
Expand Down Expand Up @@ -172,7 +172,7 @@ pub enum Expr {
/// plan into physical plan.
Wildcard,
/// Represents a reference to all available fields in a specific schema.
///
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
QualifiedWildcard { qualifier: String },
Expand All @@ -191,13 +191,20 @@ pub enum Expr {
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Alias {
pub expr: Box<Expr>,
pub relation: Option<OwnedTableReference>,
pub name: String,
}

impl Alias {
pub fn new(expr: Expr, name: impl Into<String>) -> Self {
/// Create an alias with an optional schema/field qualifier.
pub fn new(
expr: Expr,
relation: Option<impl Into<OwnedTableReference>>,
name: impl Into<String>,
) -> Self {
Self {
expr: Box::new(expr),
relation: relation.map(|r| r.into()),
name: name.into(),
}
}
Expand Down Expand Up @@ -849,7 +856,27 @@ impl Expr {
asc,
nulls_first,
}) => Expr::Sort(Sort::new(Box::new(expr.alias(name)), asc, nulls_first)),
_ => Expr::Alias(Alias::new(self, name.into())),
_ => Expr::Alias(Alias::new(self, None::<&str>, name.into())),
}
}

/// Return `self AS name` alias expression with a specific qualifier
pub fn alias_qualified(
self,
relation: Option<impl Into<OwnedTableReference>>,
name: impl Into<String>,
) -> Expr {
match self {
Expr::Sort(Sort {
expr,
asc,
nulls_first,
}) => Expr::Sort(Sort::new(
Box::new(expr.alias_qualified(relation, name)),
asc,
nulls_first,
)),
_ => Expr::Alias(Alias::new(self, relation, name.into())),
}
}

Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ impl ExprSchemable for Expr {
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
Expr::Alias(Alias { relation, name, .. }) => Ok(DFField::new(
relation.clone(),
name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
_ => Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
Expand Down
29 changes: 21 additions & 8 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::expr_rewriter::{
rewrite_sort_cols_by_aggs,
};
use crate::logical_plan::{
Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Aggregate, Analyze, CrossJoin, Distinct, DistinctOn, EmptyRelation, Explain, Filter,
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
Window,
};
Expand Down Expand Up @@ -551,16 +551,29 @@ impl LogicalPlanBuilder {
let left_plan: LogicalPlan = self.plan;
let right_plan: LogicalPlan = plan;

Ok(Self::from(LogicalPlan::Distinct(Distinct {
input: Arc::new(union(left_plan, right_plan)?),
})))
Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
union(left_plan, right_plan)?,
)))))
}

/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(self) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct {
input: Arc::new(self.plan),
})))
Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
self.plan,
)))))
}

/// Project first values of the specified expression list according to the provided
/// sorting expressions grouped by the `DISTINCT ON` clause expressions.
pub fn distinct_on(
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct::On(
DistinctOn::try_new(on_expr, select_expr, sort_expr, Arc::new(self.plan))?,
))))
}

/// Apply a join to `right` using explicitly specified columns and an
Expand Down
8 changes: 4 additions & 4 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, EmptyRelation, Explain,
Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, DistinctOn, EmptyRelation,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan,
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
};
pub use statement::{
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,
Expand Down
163 changes: 148 additions & 15 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::sync::Arc;
use super::dml::CopyTo;
use super::DdlStatement;
use crate::dml::CopyOptions;
use crate::expr::{Alias, Exists, InSubquery, Placeholder};
use crate::expr_rewriter::create_col_from_scalar_expr;
use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr};
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols};
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::logical_plan::{DmlStatement, Statement};
Expand Down Expand Up @@ -163,7 +163,8 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
Comment on lines +166 to +167
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add api-change label for this diff

LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
Expand Down Expand Up @@ -367,6 +368,16 @@ impl LogicalPlan {
LogicalPlan::Unnest(Unnest { column, .. }) => {
f(&Expr::Column(column.clone()))
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
})) => on_expr
.iter()
.chain(select_expr.iter())
.chain(sort_expr.clone().unwrap_or(vec![]).iter())
.try_for_each(f),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
Expand All @@ -377,7 +388,7 @@ impl LogicalPlan {
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Distinct(Distinct::All(_))
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
Expand Down Expand Up @@ -405,7 +416,9 @@ impl LogicalPlan {
LogicalPlan::Union(Union { inputs, .. }) => {
inputs.iter().map(|arc| arc.as_ref()).collect()
}
LogicalPlan::Distinct(Distinct { input }) => vec![input],
LogicalPlan::Distinct(
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
) => vec![input],
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::Dml(write) => vec![&write.input],
Expand Down Expand Up @@ -461,8 +474,11 @@ impl LogicalPlan {
Ok(Some(agg.group_expr.as_slice()[0].clone()))
}
}
LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
Ok(Some(select_expr[0].clone()))
}
LogicalPlan::Filter(Filter { input, .. })
| LogicalPlan::Distinct(Distinct { input, .. })
| LogicalPlan::Distinct(Distinct::All(input))
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
Expand Down Expand Up @@ -823,10 +839,29 @@ impl LogicalPlan {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
})),
LogicalPlan::Distinct(Distinct { .. }) => {
Ok(LogicalPlan::Distinct(Distinct {
input: Arc::new(inputs[0].clone()),
}))
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(_) => Distinct::All(Arc::new(inputs[0].clone())),
Distinct::On(DistinctOn {
on_expr,
select_expr,
..
}) => {
let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
let select_expr = expr.split_off(on_expr.len());
Distinct::On(DistinctOn::try_new(
expr,
select_expr,
if !sort_expr.is_empty() {
Some(sort_expr)
} else {
None
},
Arc::new(inputs[0].clone()),
)?)
}
};
Ok(LogicalPlan::Distinct(distinct))
}
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
Expand Down Expand Up @@ -1064,7 +1099,9 @@ impl LogicalPlan {
LogicalPlan::Subquery(_) => None,
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
LogicalPlan::Distinct(Distinct { input }) => input.max_rows(),
LogicalPlan::Distinct(
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
) => input.max_rows(),
LogicalPlan::Values(v) => Some(v.values.len()),
LogicalPlan::Unnest(_) => None,
LogicalPlan::Ddl(_)
Expand Down Expand Up @@ -1667,9 +1704,21 @@ impl LogicalPlan {
LogicalPlan::Statement(statement) => {
write!(f, "{}", statement.display())
}
LogicalPlan::Distinct(Distinct { .. }) => {
write!(f, "Distinct:")
}
LogicalPlan::Distinct(distinct) => match distinct {
Distinct::All(_) => write!(f, "Distinct:"),
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
..
}) => write!(
f,
"DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
expr_vec_fmt!(on_expr),
expr_vec_fmt!(select_expr),
if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
),
},
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union(_) => write!(f, "Union"),
Expand Down Expand Up @@ -2132,9 +2181,93 @@ pub struct Limit {

/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Distinct {
pub enum Distinct {
/// Plain `DISTINCT` referencing all selection expressions
All(Arc<LogicalPlan>),
/// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns
On(DistinctOn),
}

/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DistinctOn {
/// The `DISTINCT ON` clause expression list
pub on_expr: Vec<Expr>,
/// The selected projection expression list
pub select_expr: Vec<Expr>,
/// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause when
/// present. Note that those matching expressions actually wrap the `ON` expressions with
/// additional info pertaining to the sorting procedure (i.e. ASC/DESC, and NULLS FIRST/LAST).
pub sort_expr: Option<Vec<Expr>>,
/// The logical plan that is being DISTINCT'd
pub input: Arc<LogicalPlan>,
/// The schema description of the DISTINCT ON output
pub schema: DFSchemaRef,
}

impl DistinctOn {
/// Create a new `DistinctOn` struct.
pub fn try_new(
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
input: Arc<LogicalPlan>,
) -> Result<Self> {
if on_expr.is_empty() {
return plan_err!("No `ON` expressions provided");
}

let on_expr = normalize_cols(on_expr, input.as_ref())?;

let schema = DFSchema::new_with_metadata(
exprlist_to_fields(&select_expr, &input)?,
input.schema().metadata().clone(),
)?;

let mut distinct_on = DistinctOn {
on_expr,
select_expr,
sort_expr: None,
input,
schema: Arc::new(schema),
};

if let Some(sort_expr) = sort_expr {
distinct_on = distinct_on.with_sort_expr(sort_expr)?;
}

Ok(distinct_on)
}

/// Try to update `self` with a new sort expressions.
///
/// Validates that the sort expressions are a super-set of the `ON` expressions.
pub fn with_sort_expr(mut self, sort_expr: Vec<Expr>) -> Result<Self> {
let sort_expr = normalize_cols(sort_expr, self.input.as_ref())?;

// Check that the left-most sort expressions are the same as the `ON` expressions.
let mut matched = true;
for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
match sort {
Expr::Sort(SortExpr { expr, .. }) => {
if on != &**expr {
matched = false;
break;
}
}
_ => return plan_err!("Not a sort expression: {sort}"),
}
}

if self.on_expr.len() > sort_expr.len() || !matched {
return plan_err!(
"SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
);
}

self.sort_expr = Some(sort_expr);
Ok(self)
}
}

/// Aggregates its input based on a set of grouping and aggregate
Expand Down
8 changes: 5 additions & 3 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,11 @@ impl TreeNode for Expr {
let mut transform = transform;

Ok(match self {
Expr::Alias(Alias { expr, name, .. }) => {
Expr::Alias(Alias::new(transform(*expr)?, name))
}
Expr::Alias(Alias {
expr,
relation,
name,
}) => Expr::Alias(Alias::new(transform(*expr)?, relation, name)),
Expr::Column(_) => self,
Expr::OuterReferenceColumn(_, _) => self,
Expr::Exists { .. } => self,
Expand Down
Loading