From d4772923f8932fa1ff43444316f04d0294143650 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Tue, 23 Jan 2024 12:43:51 +0800 Subject: [PATCH 01/12] add more test --- ...merge_into_without_distributed_enable.test | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index 9cc961bec218..9ad4f151385e 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -1091,5 +1091,127 @@ select * from tt1_; ---- 0 20 21 +## add more tests cases for distributed modes. +statement ok +CREATE TABLE IF NOT EXISTS lineitem_target_origin_200_blocks1 ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) CLUSTER BY(l_shipdate, l_orderkey); + +statement ok +CREATE TABLE IF NOT EXISTS lineitem_target_origin_400_blocks1 ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) CLUSTER BY(l_shipdate, l_orderkey); + +statement ok +CREATE TABLE IF NOT EXISTS lineitem_random( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) engine = random; + +## add 400w rows +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +query T +select count(*) from lineitem_target_origin_400_blocks1; +---- +4000000 + +statement ok +insert into lineitem_target_origin_200_blocks1 select * from lineitem_target_origin_400_blocks1; + +query T +select count(*) from lineitem_target_origin_200_blocks1; +---- +4000000 + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; + +query T +select count(*) from lineitem_target_origin_400_blocks1; +---- +4500000 + +## it maybe flaky test, but in most times, it's normal. +query TT +MERGE INTO lineitem_target_origin_400_blocks1 as t1 using lineitem_target_origin_200_blocks1 as t2 on +t1.l_orderkey = t2.l_orderkey and +t1.l_partkey = t2.l_partkey +and t1.l_suppkey = t2.l_suppkey and +t1.l_linenumber = t2.l_linenumber and +t1.l_quantity = t2.l_quantity and +t1.l_extendedprice = t2.l_extendedprice and +t1.l_discount = t2.l_discount +when matched then update * +when not matched then insert *; +---- +0 4000000 + statement ok set enable_experimental_merge_into = 0; From 6c45f75215d84d73ad93ebe9242fc55f42577539 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Tue, 23 Jan 2024 13:41:35 +0800 Subject: [PATCH 02/12] add map_columns field --- src/query/sql/src/planner/binder/merge_into.rs | 1 + src/query/sql/src/planner/optimizer/optimizer.rs | 1 + src/query/sql/src/planner/plans/merge_into.rs | 8 +++++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 9bc0254f4547..d1ce6000680e 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -422,6 +422,7 @@ impl Binder { change_join_order: false, row_id_index: column_binding.index, split_idx, + map_columns: Default::default(), }) } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index f5f02c7c3dec..c11655a97321 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -387,6 +387,7 @@ fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box) -> Resul target_tbl_idx: plan.target_table_idx, }) } + // try to optimize distributed join, only if // - distributed optimization is enabled // - no local table scan diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index dc60b21221f2..c71f5cbd8488 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -68,9 +68,15 @@ pub struct MergeInto { pub merge_type: MergeIntoType, pub distributed: bool, pub change_join_order: bool, - // when we use target table as build side, we need to remove rowid columns. + // when we use target table as build side or insert only, we will remove rowid columns. pub row_id_index: IndexType, pub split_idx: IndexType, + // an optimization: + // if it's full_operation and we have only one update without condition here, we shouldn't run + // evalutaor, we can just do projection to get the right columns.But the limitation is below: + // `update *`` or `update set t1.a = t2.a ...`, the right expr on the `=` must be only a column, + // we don't support complex expressions. + pub map_columns: HashMap, } impl std::fmt::Debug for MergeInto { From 0cdb3bdfe18b01d5b8539ef7d3442c471084d760 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Tue, 23 Jan 2024 14:04:44 +0800 Subject: [PATCH 03/12] fix lint --- src/query/sql/src/planner/plans/merge_into.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index c71f5cbd8488..877a3d89f8e2 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -72,8 +72,8 @@ pub struct MergeInto { pub row_id_index: IndexType, pub split_idx: IndexType, // an optimization: - // if it's full_operation and we have only one update without condition here, we shouldn't run - // evalutaor, we can just do projection to get the right columns.But the limitation is below: + // if it's full_operation/mactehd only and we have only one update without condition here, we shouldn't run + // evaluator, we can just do projection to get the right columns.But the limitation is below: // `update *`` or `update set t1.a = t2.a ...`, the right expr on the `=` must be only a column, // we don't support complex expressions. pub map_columns: HashMap, From 1fd43d5a1da00b36a9a7d6c06be2687e7c4a0e5a Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 24 Jan 2024 17:26:12 +0800 Subject: [PATCH 04/12] add update column only optimizations --- .../interpreters/interpreter_merge_into.rs | 8 + .../pipelines/builders/builder_merge_into.rs | 2 + .../physical_plans/physical_merge_into.rs | 1 + .../sql/src/planner/binder/merge_into.rs | 24 ++- .../sql/src/planner/format/display_plan.rs | 7 +- src/query/sql/src/planner/plans/merge_into.rs | 6 +- .../processor_merge_into_matched_and_split.rs | 178 ++++++++++++------ ...merge_into_without_distributed_enable.test | 69 +++++++ ...39_target_build_merge_into_standalone.test | 4 + .../mode/standalone/explain/merge_into.test | 119 ++++++++++++ 10 files changed, 353 insertions(+), 65 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 930a24c5cdc5..39f970cfbe6d 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -143,6 +143,7 @@ impl MergeIntoInterpreter { change_join_order, split_idx, row_id_index, + is_update_column_only, .. } = &self.plan; let mut columns_set = columns_set.clone(); @@ -364,6 +365,10 @@ impl MergeIntoInterpreter { None }; + if *is_update_column_only { + assert!(condition.is_none()); + } + // update let update_list = if let Some(update_list) = &item.update { // use update_plan to get exprs @@ -412,6 +417,7 @@ impl MergeIntoInterpreter { ) }) .collect_vec(); + // Some(update_list) } else { // delete @@ -456,6 +462,7 @@ impl MergeIntoInterpreter { merge_type: merge_type.clone(), change_join_order: *change_join_order, target_build_optimization, + is_update_column_only: *is_update_column_only, })) } else { let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto { @@ -482,6 +489,7 @@ impl MergeIntoInterpreter { merge_type: merge_type.clone(), change_join_order: *change_join_order, target_build_optimization: false, // we don't support for distributed mode for now.. + is_update_column_only: *is_update_column_only, })); // if change_join_order = true, it means the target is build side, // in this way, we will do matched operation and not matched operation diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 0b7f554f5979..897f4de8ca64 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -395,6 +395,7 @@ impl PipelineBuilder { distributed, merge_type, change_join_order, + is_update_column_only, .. } = merge_into; @@ -475,6 +476,7 @@ impl PipelineBuilder { input.output_schema()?, Arc::new(DataSchema::from(tbl.schema())), merge_into.target_build_optimization, + *is_update_column_only, )?; pipe_items.push(matched_split_processor.into_pipe_item()); } diff --git a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs index 9c44a11d4853..1185eb55ed84 100644 --- a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs +++ b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs @@ -54,6 +54,7 @@ pub struct MergeInto { pub merge_type: MergeIntoType, pub change_join_order: bool, pub target_build_optimization: bool, + pub is_update_column_only: bool, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index d1ce6000680e..a3bc99b9ec81 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use databend_common_ast::ast::Expr; use databend_common_ast::ast::Join; use databend_common_ast::ast::JoinCondition; use databend_common_ast::ast::JoinOperator; @@ -136,6 +137,27 @@ impl Binder { Ok(Plan::MergeInto(Box::new(plan))) } + fn is_update_column_only(&self, matched_clauses: &Vec) -> bool { + if matched_clauses.len() == 1 { + let matched_clause = &matched_clauses[0]; + if matched_clause.selection.is_none() { + if let MatchOperation::Update { + update_list, + is_star, + } = &matched_clause.operation + { + let mut is_column_only = true; + for update_expr in update_list { + is_column_only = + is_column_only && matches!(update_expr.expr, Expr::ColumnRef { .. }); + } + return is_column_only || *is_star; + } + } + } + false + } + async fn bind_merge_into_with_join_type( &mut self, bind_context: &mut BindContext, @@ -422,7 +444,7 @@ impl Binder { change_join_order: false, row_id_index: column_binding.index, split_idx, - map_columns: Default::default(), + is_update_column_only: self.is_update_column_only(&matched_clauses), }) } diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index e2af5aba1e87..8570e428151c 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -294,6 +294,7 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { table_entry.database(), table_entry.name(), )); + let target_build_optimization = matches!(merge_into.merge_type, MergeIntoType::FullOperation) && !merge_into.columns_set.contains(&merge_into.row_id_index); let target_build_optimization_format = FormatTreeNode::new(FormatContext::Text(format!( @@ -304,7 +305,10 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { "distributed: {}", merge_into.distributed ))); - + let update_column_only_optimization_format = FormatTreeNode::new(FormatContext::Text(format!( + "update_column_only_optimization: {}", + merge_into.is_update_column_only + ))); // add macthed clauses let mut matched_children = Vec::with_capacity(merge_into.matched_evaluators.len()); let taregt_schema = table_entry.table().schema(); @@ -368,6 +372,7 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { let all_children = [ vec![distributed_format], vec![target_build_optimization_format], + vec![update_column_only_optimization_format], matched_children, unmatched_children, vec![input_format_child], diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index 877a3d89f8e2..e6125cd63eb5 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -76,7 +76,7 @@ pub struct MergeInto { // evaluator, we can just do projection to get the right columns.But the limitation is below: // `update *`` or `update set t1.a = t2.a ...`, the right expr on the `=` must be only a column, // we don't support complex expressions. - pub map_columns: HashMap, + pub is_update_column_only: bool, } impl std::fmt::Debug for MergeInto { @@ -90,6 +90,10 @@ impl std::fmt::Debug for MergeInto { .field("matched", &self.matched_evaluators) .field("unmatched", &self.unmatched_evaluators) .field("distributed", &self.distributed) + .field( + "update_column_only_optimization", + &self.is_update_column_only, + ) .finish() } } diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index 66bfc5e4c840..f6e1cbc309ba 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -27,6 +27,7 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::FieldIndex; use databend_common_expression::RawExpr; +use databend_common_expression::RemoteExpr; use databend_common_expression::Value; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_metrics::storage::*; @@ -38,6 +39,7 @@ use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::PipeItem; use databend_common_sql::evaluator::BlockOperator; use databend_common_sql::executor::physical_plans::MatchExpr; +use databend_common_sql::IndexType; use databend_common_storage::MergeStatus; use crate::operations::common::MutationLogs; @@ -124,6 +126,7 @@ pub struct MatchedSplitProcessor { output_data_updated_data: Option, target_table_schema: DataSchemaRef, target_build_optimization: bool, + is_update_column_only: bool, } impl MatchedSplitProcessor { @@ -135,43 +138,91 @@ impl MatchedSplitProcessor { input_schema: DataSchemaRef, target_table_schema: DataSchemaRef, target_build_optimization: bool, + is_update_column_only: bool, ) -> Result { + let mut update_projections = Vec::with_capacity(field_index_of_input_schema.len()); let mut ops = Vec::::new(); - for item in matched.iter() { - // delete - if item.1.is_none() { - let filter = item.0.as_ref().map(|expr| expr.as_expr(&BUILTIN_FUNCTIONS)); - ops.push(MutationKind::Delete(DeleteDataBlockMutation { - delete_mutator: DeleteByExprMutator::create( - filter.clone(), - ctx.get_function_context()?, - row_id_idx, - input_schema.num_fields(), - target_build_optimization, - ), - })) - } else { - let update_lists = item.1.as_ref().unwrap(); - let filter = item - .0 - .as_ref() - .map(|condition| condition.as_expr(&BUILTIN_FUNCTIONS)); - - ops.push(MutationKind::Update(UpdateDataBlockMutation { - update_mutator: UpdateByExprMutator::create( - filter, - ctx.get_function_context()?, - field_index_of_input_schema.clone(), - update_lists.clone(), - input_schema.num_fields(), - ), - })) + if is_update_column_only { + assert_eq!(matched.len(), 1); + let item = &matched[0]; + // there is no condition + assert!(item.0.is_none()); + // it's update not delete. + assert!(item.1.is_some()); + let update_exprs = item.1.as_ref().unwrap(); + let update_field_indexes: HashMap = update_exprs + .iter() + .map(|item| { + let cast_update_set_expr = + if let RemoteExpr::FunctionCall { id, args, .. } = &item.1 { + assert_eq!(id.name(), "if"); + // the predcate is always true. + &args[1] + } else { + unreachable!() + }; + let update_set_expr = + if let RemoteExpr::Cast { expr, .. } = cast_update_set_expr { + expr.as_ref() + } else { + unreachable!() + }; + if let RemoteExpr::ColumnRef { id, .. } = update_set_expr { + // (field_index,project_idx) + (item.0, *id) + } else { + unreachable!() + } + }) + .collect(); + // `field_index_of_input_schema` contains all columns of target_table + for field_index in 0..field_index_of_input_schema.len() { + if update_field_indexes.contains_key(&field_index) { + update_projections.push(*update_field_indexes.get(&field_index).unwrap()); + } else { + update_projections + .push(*field_index_of_input_schema.get(&field_index).unwrap()); + } + } + } else { + for item in matched.iter() { + // delete + if item.1.is_none() { + let filter = item.0.as_ref().map(|expr| expr.as_expr(&BUILTIN_FUNCTIONS)); + ops.push(MutationKind::Delete(DeleteDataBlockMutation { + delete_mutator: DeleteByExprMutator::create( + filter.clone(), + ctx.get_function_context()?, + row_id_idx, + input_schema.num_fields(), + target_build_optimization, + ), + })) + } else { + let update_lists = item.1.as_ref().unwrap(); + let filter = item + .0 + .as_ref() + .map(|condition| condition.as_expr(&BUILTIN_FUNCTIONS)); + + ops.push(MutationKind::Update(UpdateDataBlockMutation { + update_mutator: UpdateByExprMutator::create( + filter, + ctx.get_function_context()?, + field_index_of_input_schema.clone(), + update_lists.clone(), + input_schema.num_fields(), + ), + })) + } + } + + // `field_index_of_input_schema` contains all columns of target_table + for field_index in 0..field_index_of_input_schema.len() { + update_projections.push(*field_index_of_input_schema.get(&field_index).unwrap()); } } - let mut update_projections = Vec::with_capacity(field_index_of_input_schema.len()); - for field_index in 0..field_index_of_input_schema.len() { - update_projections.push(*field_index_of_input_schema.get(&field_index).unwrap()); - } + let input_port = InputPort::create(); let output_port_row_id = OutputPort::create(); let output_port_updated = OutputPort::create(); @@ -188,6 +239,7 @@ impl MatchedSplitProcessor { update_projections, target_table_schema, target_build_optimization, + is_update_column_only, }) } @@ -282,46 +334,48 @@ impl Processor for MatchedSplitProcessor { return Ok(()); } // insert-only, we need to remove this pipeline according to strategy. - if self.ops.is_empty() { + if self.ops.is_empty() && !self.is_update_column_only { return Ok(()); } let start = Instant::now(); let mut current_block = data_block; - - for op in self.ops.iter() { - match op { - MutationKind::Update(update_mutation) => { - let stage_block = update_mutation - .update_mutator - .update_by_expr(current_block)?; - current_block = stage_block; - } - - MutationKind::Delete(delete_mutation) => { - let (stage_block, mut row_ids) = delete_mutation - .delete_mutator - .delete_by_expr(current_block)?; - - // delete all - if !row_ids.is_empty() { - row_ids = row_ids.add_meta(Some(Box::new(RowIdKind::Delete)))?; - self.output_data_row_id_data.push(row_ids); + if !self.is_update_column_only { + for op in self.ops.iter() { + match op { + MutationKind::Update(update_mutation) => { + let stage_block = update_mutation + .update_mutator + .update_by_expr(current_block)?; + current_block = stage_block; } - if stage_block.is_empty() { - return Ok(()); + MutationKind::Delete(delete_mutation) => { + let (stage_block, mut row_ids) = delete_mutation + .delete_mutator + .delete_by_expr(current_block)?; + + // delete all + if !row_ids.is_empty() { + row_ids = row_ids.add_meta(Some(Box::new(RowIdKind::Delete)))?; + self.output_data_row_id_data.push(row_ids); + } + + if stage_block.is_empty() { + return Ok(()); + } + current_block = stage_block; } - current_block = stage_block; } } + + let filter: Value = current_block + .get_by_offset(current_block.num_columns() - 1) + .value + .try_downcast() + .unwrap(); + current_block = current_block.filter_boolean_value(&filter)?; } - let filter: Value = current_block - .get_by_offset(current_block.num_columns() - 1) - .value - .try_downcast() - .unwrap(); - current_block = current_block.filter_boolean_value(&filter)?; if !current_block.is_empty() { // add updated row_ids self.ctx.add_merge_status(MergeStatus { diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index 9ad4f151385e..8e8f16d8eb8b 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -1213,5 +1213,74 @@ when not matched then insert *; ---- 0 4000000 +## test update column only optimization +statement ok +drop table if exists column_only_optimization_target; + +statement ok +drop table if exists column_only_optimization_source; + +statement ok +create table column_only_optimization_target(a int,b string); + +statement ok +create table column_only_optimization_source(a int,b string); + +statement ok +insert into column_only_optimization_target values(1,'a1'),(2,'a2'); + +statement ok +insert into column_only_optimization_target values(3,'a3'),(4,'a4'); + +statement ok +insert into column_only_optimization_target values(5,'a5'),(6,'a6'); + +statement ok +insert into column_only_optimization_target values(7,'a7'),(8,'a8'); + +query TT +select * from column_only_optimization_target order by a,b; +---- +1 a1 +2 a2 +3 a3 +4 a4 +5 a5 +6 a6 +7 a7 +8 a8 + +statement ok +insert into column_only_optimization_source values(1,'b1'),(2,'b2'); + +statement ok +insert into column_only_optimization_source values(3,'b3'),(4,'b4'); + +query TT +select * from column_only_optimization_source order by a,b; +---- +1 b1 +2 b2 +3 b3 +4 b4 + +query TT +merge into column_only_optimization_target as t1 using column_only_optimization_source as t2 on +t1.a = t2.a when matched then update t1.b = t2.b when not macted then insert *; +---- +0 4 + +query TT +select * from column_only_optimization_target order by a,b; +---- +1 b1 +2 b2 +3 b3 +4 b4 +5 a5 +6 a6 +7 a7 +8 a8 + statement ok set enable_experimental_merge_into = 0; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test index e9cdadf2b9a9..75080a96be6e 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test @@ -52,6 +52,7 @@ MergeInto: target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true +├── update_column_only_optimization: true ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── HashJoin: LEFT OUTER @@ -112,6 +113,7 @@ MergeInto: target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true +├── update_column_only_optimization: true ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── HashJoin: LEFT OUTER @@ -186,6 +188,7 @@ MergeInto: target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true +├── update_column_only_optimization: true ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── HashJoin: LEFT OUTER @@ -267,6 +270,7 @@ MergeInto: target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true +├── update_column_only_optimization: true ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── HashJoin: LEFT OUTER diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index e372a9a329e6..fd6d1c282388 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -25,6 +25,7 @@ MergeInto: target_table: default.default.salaries2 ├── distributed: false ├── target_build_optimization: false +├── update_column_only_optimization: false ├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] ├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] @@ -54,6 +55,7 @@ MergeInto: target_table: default.default.salaries2 ├── distributed: false ├── target_build_optimization: false +├── update_column_only_optimization: false ├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] ├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] @@ -73,6 +75,123 @@ target_table: default.default.salaries2 ├── order by: [] └── limit: NONE +## test update column only optimization +statement ok +drop table if exists column_only_optimization_target; + +statement ok +drop table if exists column_only_optimization_source; + +statement ok +create table column_only_optimization_target(a int,b string); + +statement ok +create table column_only_optimization_source(a int,b string); + +query T +explain MERGE INTO column_only_optimization_target as t1 using column_only_optimization_source as t2 +on t1.a = t2.a when matched then update set t1.b = t2.b when not matched then insert *; +----- +MergeInto: +target_table: default.default.column_only_optimization_target +├── distributed: false +├── target_build_optimization: true +├── update_column_only_optimization: true +├── matched update: [condition: None,update set b = t2.b (#1)] +├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [eq(t2.a (#0), t1.a (#2))] + ├── non-equi conditions: [] + ├── Exchange(Merge) + │ └── LogicalGet + │ ├── table: default.default.column_only_optimization_source + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.column_only_optimization_target + ├── filters: [] + ├── order by: [] + └── limit: NONE + +query T +explain MERGE INTO column_only_optimization_target as t1 using column_only_optimization_source as t2 +on t1.a = t2.a when matched then update * when not matched then insert *; +---- +MergeInto: +target_table: default.default.column_only_optimization_target +├── distributed: false +├── target_build_optimization: true +├── update_column_only_optimization: true +├── matched update: [condition: None,update set a = a (#0),b = b (#1)] +├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [eq(t2.a (#0), t1.a (#2))] + ├── non-equi conditions: [] + ├── Exchange(Merge) + │ └── LogicalGet + │ ├── table: default.default.column_only_optimization_source + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.column_only_optimization_target + ├── filters: [] + ├── order by: [] + └── limit: NONE + +query T +explain MERGE INTO column_only_optimization_target as t1 using column_only_optimization_source as t2 +on t1.a = t2.a when matched then update set t1.b = 'test' when not matched then insert *; +---- +MergeInto: +target_table: default.default.column_only_optimization_target +├── distributed: false +├── target_build_optimization: true +├── update_column_only_optimization: false +├── matched update: [condition: None,update set b = 'test'] +├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [eq(t2.a (#0), t1.a (#2))] + ├── non-equi conditions: [] + ├── Exchange(Merge) + │ └── LogicalGet + │ ├── table: default.default.column_only_optimization_source + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.column_only_optimization_target + ├── filters: [] + ├── order by: [] + └── limit: NONE + +query T +explain MERGE INTO column_only_optimization_target as t1 using column_only_optimization_source as t2 +on t1.a = t2.a when matched then update set t1.b = concat(t2.b,'test') when not matched then insert *; +---- +MergeInto: +target_table: default.default.column_only_optimization_target +├── distributed: false +├── target_build_optimization: true +├── update_column_only_optimization: false +├── matched update: [condition: None,update set b = concat(t2.b (#1), 'test')] +├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [eq(t2.a (#0), t1.a (#2))] + ├── non-equi conditions: [] + ├── Exchange(Merge) + │ └── LogicalGet + │ ├── table: default.default.column_only_optimization_source + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.column_only_optimization_target + ├── filters: [] + ├── order by: [] + └── limit: NONE + statement ok set enable_experimental_merge_into = 0; From d0825c759bc085be89b6db39c668eb6e3e68cdd8 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 24 Jan 2024 18:01:38 +0800 Subject: [PATCH 05/12] fix lint --- src/query/sql/src/planner/binder/merge_into.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index a3bc99b9ec81..632b5ba66e72 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -137,7 +137,7 @@ impl Binder { Ok(Plan::MergeInto(Box::new(plan))) } - fn is_update_column_only(&self, matched_clauses: &Vec) -> bool { + fn is_update_column_only(&self, matched_clauses: &[MatchedClause]) -> bool { if matched_clauses.len() == 1 { let matched_clause = &matched_clauses[0]; if matched_clause.selection.is_none() { From 4db70451a594749174300720acd899d2b02b755a Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 24 Jan 2024 23:11:17 +0800 Subject: [PATCH 06/12] fix test --- .../processor_merge_into_matched_and_split.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index f6e1cbc309ba..946709c4fb0f 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -153,7 +153,7 @@ impl MatchedSplitProcessor { let update_field_indexes: HashMap = update_exprs .iter() .map(|item| { - let cast_update_set_expr = + let mut update_set_expr = if let RemoteExpr::FunctionCall { id, args, .. } = &item.1 { assert_eq!(id.name(), "if"); // the predcate is always true. @@ -161,12 +161,13 @@ impl MatchedSplitProcessor { } else { unreachable!() }; - let update_set_expr = - if let RemoteExpr::Cast { expr, .. } = cast_update_set_expr { - expr.as_ref() - } else { - unreachable!() - }; + // in `generate_update_list` we will do `wrap_cast_expr` to cast `left` into dest_type, + // but after that, we will do a `type_check` in `scalar.as_expr`, if the cast's dest_type + // is the same, we will deref the `cast`. + if let RemoteExpr::Cast { expr, .. } = update_set_expr { + update_set_expr = expr.as_ref(); + } + assert!(matches!(update_set_expr, RemoteExpr::ColumnRef { .. })); if let RemoteExpr::ColumnRef { id, .. } = update_set_expr { // (field_index,project_idx) (item.0, *id) From e6dc8c924452add565563127add7d7af892ac2c1 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 25 Jan 2024 02:44:26 +0800 Subject: [PATCH 07/12] try_update_column_only_optimization,fix test --- .../interpreters/interpreter_merge_into.rs | 8 +- .../pipelines/builders/builder_merge_into.rs | 4 +- .../physical_plans/physical_merge_into.rs | 2 +- .../sql/src/planner/binder/merge_into.rs | 4 +- .../sql/src/planner/format/display_plan.rs | 2 +- src/query/sql/src/planner/plans/merge_into.rs | 4 +- .../processor_merge_into_matched_and_split.rs | 113 ++++++++++-------- .../mode/standalone/explain/merge_into.test | 2 +- 8 files changed, 77 insertions(+), 62 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 39f970cfbe6d..249a7d263ffe 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -143,7 +143,7 @@ impl MergeIntoInterpreter { change_join_order, split_idx, row_id_index, - is_update_column_only, + can_try_update_column_only, .. } = &self.plan; let mut columns_set = columns_set.clone(); @@ -365,7 +365,7 @@ impl MergeIntoInterpreter { None }; - if *is_update_column_only { + if *can_try_update_column_only { assert!(condition.is_none()); } @@ -462,7 +462,7 @@ impl MergeIntoInterpreter { merge_type: merge_type.clone(), change_join_order: *change_join_order, target_build_optimization, - is_update_column_only: *is_update_column_only, + can_try_update_column_only: *can_try_update_column_only, })) } else { let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto { @@ -489,7 +489,7 @@ impl MergeIntoInterpreter { merge_type: merge_type.clone(), change_join_order: *change_join_order, target_build_optimization: false, // we don't support for distributed mode for now.. - is_update_column_only: *is_update_column_only, + can_try_update_column_only: *can_try_update_column_only, })); // if change_join_order = true, it means the target is build side, // in this way, we will do matched operation and not matched operation diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 897f4de8ca64..9590fe9b8e23 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -395,7 +395,7 @@ impl PipelineBuilder { distributed, merge_type, change_join_order, - is_update_column_only, + can_try_update_column_only, .. } = merge_into; @@ -476,7 +476,7 @@ impl PipelineBuilder { input.output_schema()?, Arc::new(DataSchema::from(tbl.schema())), merge_into.target_build_optimization, - *is_update_column_only, + *can_try_update_column_only, )?; pipe_items.push(matched_split_processor.into_pipe_item()); } diff --git a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs index 1185eb55ed84..fd5c4e6c9a5d 100644 --- a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs +++ b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs @@ -54,7 +54,7 @@ pub struct MergeInto { pub merge_type: MergeIntoType, pub change_join_order: bool, pub target_build_optimization: bool, - pub is_update_column_only: bool, + pub can_try_update_column_only: bool, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 632b5ba66e72..928a36e8be33 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -137,7 +137,7 @@ impl Binder { Ok(Plan::MergeInto(Box::new(plan))) } - fn is_update_column_only(&self, matched_clauses: &[MatchedClause]) -> bool { + fn can_try_update_column_only(&self, matched_clauses: &[MatchedClause]) -> bool { if matched_clauses.len() == 1 { let matched_clause = &matched_clauses[0]; if matched_clause.selection.is_none() { @@ -444,7 +444,7 @@ impl Binder { change_join_order: false, row_id_index: column_binding.index, split_idx, - is_update_column_only: self.is_update_column_only(&matched_clauses), + can_try_update_column_only: self.can_try_update_column_only(&matched_clauses), }) } diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index 8570e428151c..0b9c48f0de1d 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -307,7 +307,7 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { ))); let update_column_only_optimization_format = FormatTreeNode::new(FormatContext::Text(format!( "update_column_only_optimization: {}", - merge_into.is_update_column_only + merge_into.can_try_update_column_only ))); // add macthed clauses let mut matched_children = Vec::with_capacity(merge_into.matched_evaluators.len()); diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index e6125cd63eb5..06e451313ac3 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -76,7 +76,7 @@ pub struct MergeInto { // evaluator, we can just do projection to get the right columns.But the limitation is below: // `update *`` or `update set t1.a = t2.a ...`, the right expr on the `=` must be only a column, // we don't support complex expressions. - pub is_update_column_only: bool, + pub can_try_update_column_only: bool, } impl std::fmt::Debug for MergeInto { @@ -92,7 +92,7 @@ impl std::fmt::Debug for MergeInto { .field("distributed", &self.distributed) .field( "update_column_only_optimization", - &self.is_update_column_only, + &self.can_try_update_column_only, ) .finish() } diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index 946709c4fb0f..912fa84b4038 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -126,10 +126,58 @@ pub struct MatchedSplitProcessor { output_data_updated_data: Option, target_table_schema: DataSchemaRef, target_build_optimization: bool, - is_update_column_only: bool, + can_try_update_column_only: bool, } impl MatchedSplitProcessor { + pub fn try_update_column_only( + update_projections: &mut Vec, + matched: MatchExpr, + field_index_of_input_schema: HashMap, + ) -> bool { + assert_eq!(matched.len(), 1); + let item = &matched[0]; + // there is no condition + assert!(item.0.is_none()); + // it's update not delete. + assert!(item.1.is_some()); + let update_exprs = item.1.as_ref().unwrap(); + let mut update_field_indexes: HashMap = + HashMap::with_capacity(update_exprs.len()); + for item in update_exprs.iter() { + let mut update_set_expr = if let RemoteExpr::FunctionCall { id, args, .. } = &item.1 { + assert_eq!(id.name(), "if"); + // the predcate is always true. + &args[1] + } else { + unreachable!() + }; + // in `generate_update_list` we will do `wrap_cast_expr` to cast `left` into dest_type, + // but after that, we will do a `type_check` in `scalar.as_expr`, if the cast's dest_type + // is the same, we will deref the `cast`. + if let RemoteExpr::Cast { expr, .. } = update_set_expr { + update_set_expr = expr.as_ref(); + } + + if let RemoteExpr::ColumnRef { id, .. } = update_set_expr { + // (field_index,project_idx) + update_field_indexes.insert(item.0, *id); + } else { + return false; + } + } + + // `field_index_of_input_schema` contains all columns of target_table + for field_index in 0..field_index_of_input_schema.len() { + if update_field_indexes.contains_key(&field_index) { + update_projections.push(*update_field_indexes.get(&field_index).unwrap()); + } else { + update_projections.push(*field_index_of_input_schema.get(&field_index).unwrap()); + } + } + true + } + pub fn create( ctx: Arc, row_id_idx: usize, @@ -138,54 +186,21 @@ impl MatchedSplitProcessor { input_schema: DataSchemaRef, target_table_schema: DataSchemaRef, target_build_optimization: bool, - is_update_column_only: bool, + can_try_update_column_only: bool, ) -> Result { let mut update_projections = Vec::with_capacity(field_index_of_input_schema.len()); let mut ops = Vec::::new(); - if is_update_column_only { - assert_eq!(matched.len(), 1); - let item = &matched[0]; - // there is no condition - assert!(item.0.is_none()); - // it's update not delete. - assert!(item.1.is_some()); - let update_exprs = item.1.as_ref().unwrap(); - let update_field_indexes: HashMap = update_exprs - .iter() - .map(|item| { - let mut update_set_expr = - if let RemoteExpr::FunctionCall { id, args, .. } = &item.1 { - assert_eq!(id.name(), "if"); - // the predcate is always true. - &args[1] - } else { - unreachable!() - }; - // in `generate_update_list` we will do `wrap_cast_expr` to cast `left` into dest_type, - // but after that, we will do a `type_check` in `scalar.as_expr`, if the cast's dest_type - // is the same, we will deref the `cast`. - if let RemoteExpr::Cast { expr, .. } = update_set_expr { - update_set_expr = expr.as_ref(); - } - assert!(matches!(update_set_expr, RemoteExpr::ColumnRef { .. })); - if let RemoteExpr::ColumnRef { id, .. } = update_set_expr { - // (field_index,project_idx) - (item.0, *id) - } else { - unreachable!() - } - }) - .collect(); - // `field_index_of_input_schema` contains all columns of target_table - for field_index in 0..field_index_of_input_schema.len() { - if update_field_indexes.contains_key(&field_index) { - update_projections.push(*update_field_indexes.get(&field_index).unwrap()); - } else { - update_projections - .push(*field_index_of_input_schema.get(&field_index).unwrap()); - } - } - } else { + let mut enable_update_column_only = false; + + if can_try_update_column_only { + enable_update_column_only = MatchedSplitProcessor::try_update_column_only( + &mut update_projections, + matched.clone(), + field_index_of_input_schema.clone(), + ); + } + + if !enable_update_column_only { for item in matched.iter() { // delete if item.1.is_none() { @@ -240,7 +255,7 @@ impl MatchedSplitProcessor { update_projections, target_table_schema, target_build_optimization, - is_update_column_only, + can_try_update_column_only, }) } @@ -335,12 +350,12 @@ impl Processor for MatchedSplitProcessor { return Ok(()); } // insert-only, we need to remove this pipeline according to strategy. - if self.ops.is_empty() && !self.is_update_column_only { + if self.ops.is_empty() && !self.can_try_update_column_only { return Ok(()); } let start = Instant::now(); let mut current_block = data_block; - if !self.is_update_column_only { + if !self.can_try_update_column_only { for op in self.ops.iter() { match op { MutationKind::Update(update_mutation) => { diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index fd6d1c282388..23c7b02e2cc2 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -91,7 +91,7 @@ create table column_only_optimization_source(a int,b string); query T explain MERGE INTO column_only_optimization_target as t1 using column_only_optimization_source as t2 on t1.a = t2.a when matched then update set t1.b = t2.b when not matched then insert *; ------ +---- MergeInto: target_table: default.default.column_only_optimization_target ├── distributed: false From 7fc5d3e75576c42e5a9b501a20e76a3ab5272c9b Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 25 Jan 2024 03:07:08 +0800 Subject: [PATCH 08/12] fix test --- .../sql/src/planner/format/display_plan.rs | 6 ++-- src/query/sql/src/planner/plans/merge_into.rs | 2 +- ...39_target_build_merge_into_standalone.test | 8 ++--- .../mode/standalone/explain/merge_into.test | 34 +++++++++---------- 4 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index 0b9c48f0de1d..e21b3eac2c26 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -305,8 +305,8 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { "distributed: {}", merge_into.distributed ))); - let update_column_only_optimization_format = FormatTreeNode::new(FormatContext::Text(format!( - "update_column_only_optimization: {}", + let can_try_update_column_only_format = FormatTreeNode::new(FormatContext::Text(format!( + "can_try_update_column_only: {}", merge_into.can_try_update_column_only ))); // add macthed clauses @@ -372,7 +372,7 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { let all_children = [ vec![distributed_format], vec![target_build_optimization_format], - vec![update_column_only_optimization_format], + vec![can_try_update_column_only_format], matched_children, unmatched_children, vec![input_format_child], diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index 06e451313ac3..9986e5f56143 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -91,7 +91,7 @@ impl std::fmt::Debug for MergeInto { .field("unmatched", &self.unmatched_evaluators) .field("distributed", &self.distributed) .field( - "update_column_only_optimization", + "can_try_update_column_only", &self.can_try_update_column_only, ) .finish() diff --git a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test index 75080a96be6e..2937825d4836 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test @@ -52,7 +52,7 @@ MergeInto: target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true -├── update_column_only_optimization: true +├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── HashJoin: LEFT OUTER @@ -113,7 +113,7 @@ MergeInto: target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true -├── update_column_only_optimization: true +├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── HashJoin: LEFT OUTER @@ -188,7 +188,7 @@ MergeInto: target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true -├── update_column_only_optimization: true +├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── HashJoin: LEFT OUTER @@ -270,7 +270,7 @@ MergeInto: target_table: default.default.target_build_optimization ├── distributed: false ├── target_build_optimization: true -├── update_column_only_optimization: true +├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] ├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] └── HashJoin: LEFT OUTER diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index 23c7b02e2cc2..c363dc1a2b1d 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -25,7 +25,7 @@ MergeInto: target_table: default.default.salaries2 ├── distributed: false ├── target_build_optimization: false -├── update_column_only_optimization: false +├── can_try_update_column_only: false ├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] ├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] @@ -55,7 +55,7 @@ MergeInto: target_table: default.default.salaries2 ├── distributed: false ├── target_build_optimization: false -├── update_column_only_optimization: false +├── can_try_update_column_only: false ├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] ├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] @@ -96,7 +96,7 @@ MergeInto: target_table: default.default.column_only_optimization_target ├── distributed: false ├── target_build_optimization: true -├── update_column_only_optimization: true +├── can_try_update_column_only: true ├── matched update: [condition: None,update set b = t2.b (#1)] ├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] └── HashJoin: LEFT OUTER @@ -122,7 +122,7 @@ MergeInto: target_table: default.default.column_only_optimization_target ├── distributed: false ├── target_build_optimization: true -├── update_column_only_optimization: true +├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = a (#0),b = b (#1)] ├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] └── HashJoin: LEFT OUTER @@ -148,18 +148,17 @@ MergeInto: target_table: default.default.column_only_optimization_target ├── distributed: false ├── target_build_optimization: true -├── update_column_only_optimization: false +├── can_try_update_column_only: false ├── matched update: [condition: None,update set b = 'test'] ├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] └── HashJoin: LEFT OUTER ├── equi conditions: [eq(t2.a (#0), t1.a (#2))] ├── non-equi conditions: [] - ├── Exchange(Merge) - │ └── LogicalGet - │ ├── table: default.default.column_only_optimization_source - │ ├── filters: [] - │ ├── order by: [] - │ └── limit: NONE + ├── LogicalGet + │ ├── table: default.default.column_only_optimization_source + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE └── LogicalGet ├── table: default.default.column_only_optimization_target ├── filters: [] @@ -174,18 +173,17 @@ MergeInto: target_table: default.default.column_only_optimization_target ├── distributed: false ├── target_build_optimization: true -├── update_column_only_optimization: false +├── can_try_update_column_only: false ├── matched update: [condition: None,update set b = concat(t2.b (#1), 'test')] ├── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))] └── HashJoin: LEFT OUTER ├── equi conditions: [eq(t2.a (#0), t1.a (#2))] ├── non-equi conditions: [] - ├── Exchange(Merge) - │ └── LogicalGet - │ ├── table: default.default.column_only_optimization_source - │ ├── filters: [] - │ ├── order by: [] - │ └── limit: NONE + ├── LogicalGet + │ ├── table: default.default.column_only_optimization_source + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE └── LogicalGet ├── table: default.default.column_only_optimization_target ├── filters: [] From ee2d1d003416b6da555fb6ef52698f80307faa5c Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 25 Jan 2024 16:06:26 +0800 Subject: [PATCH 09/12] fix tests --- ...merge_into_without_distributed_enable.test | 28 +++++++++---------- .../merge_into_non_equal_distributed.test | 1 + .../mode/standalone/explain/merge_into.test | 22 +++++++-------- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index 8e8f16d8eb8b..cbcec14b228e 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -1152,30 +1152,30 @@ CREATE TABLE IF NOT EXISTS lineitem_random( l_comment STRING not null ) engine = random; -## add 400w rows +## add 40w rows statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; query T select count(*) from lineitem_target_origin_400_blocks1; @@ -1188,15 +1188,15 @@ insert into lineitem_target_origin_200_blocks1 select * from lineitem_target_ori query T select count(*) from lineitem_target_origin_200_blocks1; ---- -4000000 +400000 statement ok -insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 500000; +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 50000; query T select count(*) from lineitem_target_origin_400_blocks1; ---- -4500000 +450000 ## it maybe flaky test, but in most times, it's normal. query TT @@ -1211,7 +1211,7 @@ t1.l_discount = t2.l_discount when matched then update * when not matched then insert *; ---- -0 4000000 +0 400000 ## test update column only optimization statement ok @@ -1266,7 +1266,7 @@ select * from column_only_optimization_source order by a,b; query TT merge into column_only_optimization_target as t1 using column_only_optimization_source as t2 on -t1.a = t2.a when matched then update t1.b = t2.b when not macted then insert *; +t1.a = t2.a when matched then update set t1.b = t2.b when not macted then insert *; ---- 0 4 diff --git a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test index 2d8c9c1af6c2..54dd979609ac 100644 --- a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test +++ b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test @@ -54,6 +54,7 @@ MergeInto: target_table: default.default.t1 ├── distributed: false ├── target_build_optimization: false +├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = a (#0)] ├── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))] └── HashJoin: RIGHT OUTER diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index c363dc1a2b1d..77f8d124fe10 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -102,12 +102,11 @@ target_table: default.default.column_only_optimization_target └── HashJoin: LEFT OUTER ├── equi conditions: [eq(t2.a (#0), t1.a (#2))] ├── non-equi conditions: [] - ├── Exchange(Merge) - │ └── LogicalGet - │ ├── table: default.default.column_only_optimization_source - │ ├── filters: [] - │ ├── order by: [] - │ └── limit: NONE + ├── LogicalGet + │ ├── table: default.default.column_only_optimization_source + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE └── LogicalGet ├── table: default.default.column_only_optimization_target ├── filters: [] @@ -128,12 +127,11 @@ target_table: default.default.column_only_optimization_target └── HashJoin: LEFT OUTER ├── equi conditions: [eq(t2.a (#0), t1.a (#2))] ├── non-equi conditions: [] - ├── Exchange(Merge) - │ └── LogicalGet - │ ├── table: default.default.column_only_optimization_source - │ ├── filters: [] - │ ├── order by: [] - │ └── limit: NONE + ├── LogicalGet + │ ├── table: default.default.column_only_optimization_source + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE └── LogicalGet ├── table: default.default.column_only_optimization_target ├── filters: [] From e2e752ba13aa5c7c680877a80d7289dd4d0365bb Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 25 Jan 2024 16:39:05 +0800 Subject: [PATCH 10/12] enable_update_column_only fix --- .../processors/processor_merge_into_matched_and_split.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index 912fa84b4038..e2850dc8209d 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -126,7 +126,7 @@ pub struct MatchedSplitProcessor { output_data_updated_data: Option, target_table_schema: DataSchemaRef, target_build_optimization: bool, - can_try_update_column_only: bool, + enable_update_column_only: bool, } impl MatchedSplitProcessor { @@ -255,7 +255,7 @@ impl MatchedSplitProcessor { update_projections, target_table_schema, target_build_optimization, - can_try_update_column_only, + enable_update_column_only, }) } @@ -350,12 +350,12 @@ impl Processor for MatchedSplitProcessor { return Ok(()); } // insert-only, we need to remove this pipeline according to strategy. - if self.ops.is_empty() && !self.can_try_update_column_only { + if self.ops.is_empty() && !self.enable_update_column_only { return Ok(()); } let start = Instant::now(); let mut current_block = data_block; - if !self.can_try_update_column_only { + if !self.enable_update_column_only { for op in self.ops.iter() { match op { MutationKind::Update(update_mutation) => { From 4090c5ea41add81c8bc2a305ef929b82493d167a Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 25 Jan 2024 17:26:26 +0800 Subject: [PATCH 11/12] fix test --- .../09_0036_merge_into_without_distributed_enable.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index cbcec14b228e..3f3b6080bb3c 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -1180,7 +1180,7 @@ insert into lineitem_target_origin_400_blocks1 select * from lineitem_random lim query T select count(*) from lineitem_target_origin_400_blocks1; ---- -4000000 +400000 statement ok insert into lineitem_target_origin_200_blocks1 select * from lineitem_target_origin_400_blocks1; From 2f211d218ff8826c519de23c0029cb1dd7e9061b Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 25 Jan 2024 18:05:43 +0800 Subject: [PATCH 12/12] fix test --- .../09_0036_merge_into_without_distributed_enable.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index 3f3b6080bb3c..9c7c72603d40 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -1266,7 +1266,7 @@ select * from column_only_optimization_source order by a,b; query TT merge into column_only_optimization_target as t1 using column_only_optimization_source as t2 on -t1.a = t2.a when matched then update set t1.b = t2.b when not macted then insert *; +t1.a = t2.a when matched then update set t1.b = t2.b when not matched then insert *; ---- 0 4