Skip to content

Commit

Permalink
chore: only wrap logical get with distinct to eliminate duplicates ro…
Browse files Browse the repository at this point in the history
…ws for subquery in single mode (#14452)
  • Loading branch information
xudong963 authored Jan 24, 2024
1 parent ea5eaf0 commit 3ccec47
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl Binder {
scan.columns.insert(row_id_index.unwrap());
table_expr.plan = Arc::new(Scan(scan));
let filter_expr = SExpr::create_unary(Arc::new(filter.into()), Arc::new(table_expr));
let mut rewriter = SubqueryRewriter::new(self.metadata.clone());
let mut rewriter = SubqueryRewriter::new(self.ctx.clone(), self.metadata.clone());
let filter_expr = rewriter.rewrite(&filter_expr)?;

Ok(SubqueryDesc {
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl Binder {
let right_prop = RelExpr::with_s_expr(&right_child).derive_relational_prop()?;
if !right_prop.outer_columns.is_empty() {
// If there are outer columns in right child, then the join is a correlated lateral join
let mut decorrelator = SubqueryRewriter::new(self.metadata.clone());
let mut decorrelator = SubqueryRewriter::new(self.ctx.clone(), self.metadata.clone());
right_child = decorrelator.flatten_plan(
&right_child,
&right_prop.outer_columns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_exception::Span;
use databend_common_expression::types::DataType;
Expand Down Expand Up @@ -53,8 +54,12 @@ use crate::MetadataRef;
/// Correlated exists subquery -> Marker join
///
/// More information can be found in the paper: Unnesting Arbitrary Queries
pub fn decorrelate_subquery(metadata: MetadataRef, s_expr: SExpr) -> Result<SExpr> {
let mut rewriter = SubqueryRewriter::new(metadata);
pub fn decorrelate_subquery(
ctx: Arc<dyn TableContext>,
metadata: MetadataRef,
s_expr: SExpr,
) -> Result<SExpr> {
let mut rewriter = SubqueryRewriter::new(ctx, metadata);
rewriter.rewrite(&s_expr)
}

Expand Down
68 changes: 35 additions & 33 deletions src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,46 +83,48 @@ impl SubqueryRewriter {
metadata.add_derived_column(name.to_string(), data_type),
);
}
let logical_get = SExpr::create_leaf(Arc::new(
let mut logical_get = SExpr::create_leaf(Arc::new(
Scan {
table_index,
columns: self.derived_columns.values().cloned().collect(),
..Default::default()
}
.into(),
));
// Wrap logical get with distinct to eliminate duplicates rows.
let mut group_items = Vec::with_capacity(self.derived_columns.len());
for (index, column_index) in self.derived_columns.values().cloned().enumerate() {
group_items.push(ScalarItem {
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: ColumnBindingBuilder::new(
"".to_string(),
column_index,
Box::new(data_types[index].clone()),
Visibility::Visible,
)
.table_index(Some(table_index))
.build(),
}),
index: column_index,
});
if self.ctx.get_cluster().is_empty() {
// Wrap logical get with distinct to eliminate duplicates rows.
let mut group_items = Vec::with_capacity(self.derived_columns.len());
for (index, column_index) in self.derived_columns.values().cloned().enumerate() {
group_items.push(ScalarItem {
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: ColumnBindingBuilder::new(
"".to_string(),
column_index,
Box::new(data_types[index].clone()),
Visibility::Visible,
)
.table_index(Some(table_index))
.build(),
}),
index: column_index,
});
}
logical_get = SExpr::create_unary(
Arc::new(
Aggregate {
mode: AggregateMode::Initial,
group_items,
aggregate_functions: vec![],
from_distinct: false,
limit: None,
grouping_sets: None,
}
.into(),
),
Arc::new(logical_get),
);
}
let duplicate_delete_get = SExpr::create_unary(
Arc::new(
Aggregate {
mode: AggregateMode::Initial,
group_items,
aggregate_functions: vec![],
from_distinct: false,
limit: None,
grouping_sets: None,
}
.into(),
),
Arc::new(logical_get),
);

let cross_join = Join {
left_conditions: vec![],
Expand All @@ -138,7 +140,7 @@ impl SubqueryRewriter {

return Ok(SExpr::create_binary(
Arc::new(cross_join),
Arc::new(duplicate_delete_get),
Arc::new(logical_get),
Arc::new(plan.clone()),
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::vec;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
Expand Down Expand Up @@ -67,13 +68,15 @@ pub struct FlattenInfo {

/// Rewrite subquery into `Apply` operator
pub struct SubqueryRewriter {
pub(crate) ctx: Arc<dyn TableContext>,
pub(crate) metadata: MetadataRef,
pub(crate) derived_columns: HashMap<IndexType, IndexType>,
}

impl SubqueryRewriter {
pub fn new(metadata: MetadataRef) -> Self {
pub fn new(ctx: Arc<dyn TableContext>, metadata: MetadataRef) -> Self {
Self {
ctx,
metadata,
derived_columns: Default::default(),
}
Expand Down
12 changes: 10 additions & 2 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,11 @@ pub fn optimize_query(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Result<SE

// Decorrelate subqueries, after this step, there should be no subquery in the expression.
if s_expr.contain_subquery() {
s_expr = decorrelate_subquery(opt_ctx.metadata.clone(), s_expr.clone())?;
s_expr = decorrelate_subquery(
opt_ctx.table_ctx.clone(),
opt_ctx.metadata.clone(),
s_expr.clone(),
)?;
}

// Run default rewrite rules
Expand Down Expand Up @@ -285,7 +289,11 @@ fn get_optimized_memo(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Result<Me

// Decorrelate subqueries, after this step, there should be no subquery in the expression.
if s_expr.contain_subquery() {
s_expr = decorrelate_subquery(opt_ctx.metadata.clone(), s_expr.clone())?;
s_expr = decorrelate_subquery(
opt_ctx.table_ctx.clone(),
opt_ctx.metadata.clone(),
s_expr.clone(),
)?;
}

// Run default rewrite rules
Expand Down

0 comments on commit 3ccec47

Please sign in to comment.