Skip to content

Commit

Permalink
fix: incorrect schema may be used while executing replace-into (#14592
Browse files Browse the repository at this point in the history
)

* chore: rename to `fill_missing_and_reorder_columns`

* adjust test case

* rename to `fill_and_reorder_columns`

* fix

* add sql logic test

---------

Co-authored-by: dantengsky <dantengsky@gmail.com>
  • Loading branch information
sundy-li and dantengsky committed Feb 4, 2024
1 parent 432ead0 commit 71241d1
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl ReplaceInterpreter {
table_info: table_info.clone(),
catalog_info: catalog.info(),
select_ctx,
table_schema: plan.schema.clone(),
target_schema: plan.schema.clone(),
table_level_range_index,
need_insert: true,
delete_when,
Expand Down
14 changes: 2 additions & 12 deletions src/query/service/src/pipelines/builders/builder_append_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ impl PipelineBuilder {
append_mode: AppendMode,
deduplicated_label: Option<String>,
) -> Result<()> {
Self::build_fill_missing_columns_pipeline(
ctx.clone(),
main_pipeline,
table.clone(),
source_schema,
)?;
Self::fill_and_reorder_columns(ctx.clone(), main_pipeline, table.clone(), source_schema)?;

table.append_data(ctx.clone(), main_pipeline, append_mode)?;

Expand All @@ -68,12 +63,7 @@ impl PipelineBuilder {
source_schema: DataSchemaRef,
append_mode: AppendMode,
) -> Result<()> {
Self::build_fill_missing_columns_pipeline(
ctx.clone(),
main_pipeline,
table.clone(),
source_schema,
)?;
Self::fill_and_reorder_columns(ctx.clone(), main_pipeline, table.clone(), source_schema)?;

table.append_data(ctx, main_pipeline, append_mode)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl PipelineBuilder {
)?;

let source_schema = insert_schema;
Self::build_fill_missing_columns_pipeline(
Self::fill_and_reorder_columns(
self.ctx.clone(),
&mut self.main_pipeline,
table.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::sessions::QueryContext;

/// This file implements append to table pipeline builder.
impl PipelineBuilder {
pub fn build_fill_missing_columns_pipeline(
// Fill missing columns with default or compute expr
// ** Also reorder the block into table's schema order **
pub fn fill_and_reorder_columns(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
table: Arc<dyn Table>,
Expand Down
12 changes: 6 additions & 6 deletions src/query/service/src/pipelines/builders/builder_replace_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl PipelineBuilder {
catalog_info,
select_ctx,
table_level_range_index,
table_schema,
target_schema,
need_insert,
delete_when,
} = deduplicate;
Expand All @@ -284,7 +284,7 @@ impl PipelineBuilder {
false,
)?;

let mut target_schema: DataSchema = table_schema.clone().into();
let mut target_schema: DataSchema = target_schema.clone().into();
if let Some((_, delete_column)) = delete_when {
delete_column_idx = select_schema.index_of(delete_column.as_str())?;
let delete_column = select_schema.field(delete_column_idx).clone();
Expand Down Expand Up @@ -314,11 +314,11 @@ impl PipelineBuilder {
}
}

Self::build_fill_missing_columns_pipeline(
Self::fill_and_reorder_columns(
self.ctx.clone(),
&mut self.main_pipeline,
tbl.clone(),
Arc::new(table_schema.clone().into()),
Arc::new(target_schema.clone().into()),
)?;

let _ = table.cluster_gen_for_append(
Expand Down Expand Up @@ -359,7 +359,7 @@ impl PipelineBuilder {
on_conflicts.clone(),
cluster_keys,
bloom_filter_column_indexes.clone(),
table_schema.as_ref(),
&table.schema(),
*table_is_empty,
table_level_range_index.clone(),
delete_when.map(|(expr, _)| (expr, delete_column_idx)),
Expand All @@ -372,7 +372,7 @@ impl PipelineBuilder {
on_conflicts.clone(),
cluster_keys,
bloom_filter_column_indexes.clone(),
table_schema.as_ref(),
&table.schema(),
*table_is_empty,
table_level_range_index.clone(),
delete_when.map(|_| delete_column_idx),
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ impl TestFixture {
)?;

let data_schema: DataSchemaRef = Arc::new(source_schema.into());
PipelineBuilder::build_fill_missing_columns_pipeline(
PipelineBuilder::fill_and_reorder_columns(
ctx.clone(),
&mut build_res.main_pipeline,
table.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct ReplaceDeduplicate {
pub table_is_empty: bool,
pub table_info: TableInfo,
pub catalog_info: CatalogInfo,
pub table_schema: TableSchemaRef,
pub target_schema: TableSchemaRef,
pub select_ctx: Option<ReplaceSelectCtx>,
pub table_level_range_index: HashMap<ColumnId, ColumnStatistics>,
pub need_insert: bool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
statement ok
DROP DATABASE IF EXISTS issue_14593

statement ok
CREATE DATABASE issue_14593

statement ok
USE issue_14593

# https://github.com/datafuselabs/databend/issues/14593

statement ok
create table t (a string, b string, c string, id int, d string) cluster by (id);

statement ok
replace into t (b, id, a) on(id) values('b', 1, 'a');

query ITT
select id, a, b from t;
----
1 a b

statement ok
alter table t drop column c;

statement ok
alter table t drop column d;

statement ok
replace into t (b, id, a) on(id) values('bb', 1, 'aa');

query ITT
select id, a, b from t;
----
1 aa bb

statement ok
drop table t;

statement ok
DROP DATABASE issue_14593

0 comments on commit 71241d1

Please sign in to comment.