Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve merge into performance, Update Column Only Optimization #14429

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d477292
add more test
JackTan25 Jan 23, 2024
6c45f75
add map_columns field
JackTan25 Jan 23, 2024
47e1776
Merge branch 'main' into improve_merge_into_performance_test
JackTan25 Jan 23, 2024
0cdb3bd
fix lint
JackTan25 Jan 23, 2024
8ca5f08
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Jan 24, 2024
88cdcee
Merge branch 'improve_merge_into_performance_test' of https://github.…
JackTan25 Jan 24, 2024
1fd43d5
add update column only optimizations
JackTan25 Jan 24, 2024
d0825c7
fix lint
JackTan25 Jan 24, 2024
2df9348
Merge branch 'main' into improve_merge_into_performance_test
JackTan25 Jan 24, 2024
4db7045
fix test
JackTan25 Jan 24, 2024
dfd3709
Merge branch 'main' into improve_merge_into_performance_test
JackTan25 Jan 24, 2024
e6dc8c9
try_update_column_only_optimization,fix test
JackTan25 Jan 24, 2024
87f4d4a
Merge branch 'improve_merge_into_performance_test' of https://github.…
JackTan25 Jan 24, 2024
9f5313a
Merge branch 'main' into improve_merge_into_performance_test
JackTan25 Jan 24, 2024
7fc5d3e
fix test
JackTan25 Jan 24, 2024
351e021
Merge branch 'improve_merge_into_performance_test' of https://github.…
JackTan25 Jan 24, 2024
ee2d1d0
fix tests
JackTan25 Jan 25, 2024
754af44
Merge branch 'main' into improve_merge_into_performance_test
JackTan25 Jan 25, 2024
e2e752b
enable_update_column_only fix
JackTan25 Jan 25, 2024
6e23323
Merge branch 'improve_merge_into_performance_test' of https://github.…
JackTan25 Jan 25, 2024
4090c5e
fix test
JackTan25 Jan 25, 2024
2f211d2
fix test
JackTan25 Jan 25, 2024
ccdbc02
Merge branch 'main' into improve_merge_into_performance_test
JackTan25 Jan 25, 2024
62e75a2
Merge branch 'main' into improve_merge_into_performance_test
JackTan25 Jan 25, 2024
0ba2037
Merge branch 'main' into improve_merge_into_performance_test
JackTan25 Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl MergeIntoInterpreter {
change_join_order,
split_idx,
row_id_index,
can_try_update_column_only,
..
} = &self.plan;
let mut columns_set = columns_set.clone();
Expand Down Expand Up @@ -363,6 +364,10 @@ impl MergeIntoInterpreter {
None
};

if *can_try_update_column_only {
assert!(condition.is_none());
}

// update
let update_list = if let Some(update_list) = &item.update {
// use update_plan to get exprs
Expand Down Expand Up @@ -411,6 +416,7 @@ impl MergeIntoInterpreter {
)
})
.collect_vec();
//
Some(update_list)
} else {
// delete
Expand Down Expand Up @@ -455,6 +461,7 @@ impl MergeIntoInterpreter {
merge_type: merge_type.clone(),
change_join_order: *change_join_order,
target_build_optimization,
can_try_update_column_only: *can_try_update_column_only,
}))
} else {
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
Expand All @@ -481,6 +488,7 @@ impl MergeIntoInterpreter {
merge_type: merge_type.clone(),
change_join_order: *change_join_order,
target_build_optimization: false, // we don't support for distributed mode for now..
can_try_update_column_only: *can_try_update_column_only,
}));
// if change_join_order = true, it means the target is build side,
// in this way, we will do matched operation and not matched operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ impl PipelineBuilder {
distributed,
merge_type,
change_join_order,
can_try_update_column_only,
..
} = merge_into;

Expand Down Expand Up @@ -475,6 +476,7 @@ impl PipelineBuilder {
input.output_schema()?,
Arc::new(DataSchema::from(tbl.schema())),
merge_into.target_build_optimization,
*can_try_update_column_only,
)?;
pipe_items.push(matched_split_processor.into_pipe_item());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct MergeInto {
pub merge_type: MergeIntoType,
pub change_join_order: bool,
pub target_build_optimization: bool,
pub can_try_update_column_only: bool,
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down
23 changes: 23 additions & 0 deletions src/query/sql/src/planner/binder/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_ast::ast::Expr;
use databend_common_ast::ast::Join;
use databend_common_ast::ast::JoinCondition;
use databend_common_ast::ast::JoinOperator;
Expand Down Expand Up @@ -136,6 +137,27 @@ impl Binder {
Ok(Plan::MergeInto(Box::new(plan)))
}

fn can_try_update_column_only(&self, matched_clauses: &[MatchedClause]) -> bool {
if matched_clauses.len() == 1 {
let matched_clause = &matched_clauses[0];
if matched_clause.selection.is_none() {
if let MatchOperation::Update {
update_list,
is_star,
} = &matched_clause.operation
{
let mut is_column_only = true;
for update_expr in update_list {
is_column_only =
is_column_only && matches!(update_expr.expr, Expr::ColumnRef { .. });
}
return is_column_only || *is_star;
}
}
}
false
}

async fn bind_merge_into_with_join_type(
&mut self,
bind_context: &mut BindContext,
Expand Down Expand Up @@ -422,6 +444,7 @@ impl Binder {
change_join_order: false,
row_id_index: column_binding.index,
split_idx,
can_try_update_column_only: self.can_try_update_column_only(&matched_clauses),
})
}

Expand Down
7 changes: 6 additions & 1 deletion src/query/sql/src/planner/format/display_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ fn format_merge_into(merge_into: &MergeInto) -> Result<String> {
table_entry.database(),
table_entry.name(),
));

let target_build_optimization = matches!(merge_into.merge_type, MergeIntoType::FullOperation)
&& !merge_into.columns_set.contains(&merge_into.row_id_index);
let target_build_optimization_format = FormatTreeNode::new(FormatContext::Text(format!(
Expand All @@ -304,7 +305,10 @@ fn format_merge_into(merge_into: &MergeInto) -> Result<String> {
"distributed: {}",
merge_into.distributed
)));

let can_try_update_column_only_format = FormatTreeNode::new(FormatContext::Text(format!(
"can_try_update_column_only: {}",
merge_into.can_try_update_column_only
)));
// add macthed clauses
let mut matched_children = Vec::with_capacity(merge_into.matched_evaluators.len());
let taregt_schema = table_entry.table().schema();
Expand Down Expand Up @@ -368,6 +372,7 @@ fn format_merge_into(merge_into: &MergeInto) -> Result<String> {
let all_children = [
vec![distributed_format],
vec![target_build_optimization_format],
vec![can_try_update_column_only_format],
matched_children,
unmatched_children,
vec![input_format_child],
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box<MergeInto>) -> Resul
target_tbl_idx: plan.target_table_idx,
})
}

// try to optimize distributed join, only if
// - distributed optimization is enabled
// - no local table scan
Expand Down
12 changes: 11 additions & 1 deletion src/query/sql/src/planner/plans/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,15 @@ pub struct MergeInto {
pub merge_type: MergeIntoType,
pub distributed: bool,
pub change_join_order: bool,
// when we use target table as build side, we need to remove rowid columns.
// when we use target table as build side or insert only, we will remove rowid columns.
pub row_id_index: IndexType,
pub split_idx: IndexType,
// an optimization:
// if it's full_operation/mactehd only and we have only one update without condition here, we shouldn't run
// evaluator, we can just do projection to get the right columns.But the limitation is below:
// `update *`` or `update set t1.a = t2.a ...`, the right expr on the `=` must be only a column,
// we don't support complex expressions.
pub can_try_update_column_only: bool,
}

impl std::fmt::Debug for MergeInto {
Expand All @@ -84,6 +90,10 @@ impl std::fmt::Debug for MergeInto {
.field("matched", &self.matched_evaluators)
.field("unmatched", &self.unmatched_evaluators)
.field("distributed", &self.distributed)
.field(
"can_try_update_column_only",
&self.can_try_update_column_only,
)
.finish()
}
}
Expand Down
Loading
Loading