Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multi-table insert #15002

Merged
merged 68 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
4eb64f3
parse insert first
SkyFan2002 Mar 9, 2024
c54d8cf
fix parse
SkyFan2002 Mar 9, 2024
40a3195
init interpreter
SkyFan2002 Mar 12, 2024
965ff8d
part of pipeline
SkyFan2002 Mar 19, 2024
633c6b8
fix bind subquery
SkyFan2002 Mar 19, 2024
a7c20ce
save before clippy fix
SkyFan2002 Mar 22, 2024
b46ac6d
fix clippy
SkyFan2002 Mar 22, 2024
88b6b7b
add parser ut
SkyFan2002 Mar 23, 2024
3b74fdb
add binder test
SkyFan2002 Mar 23, 2024
40b4308
fmt
SkyFan2002 Mar 23, 2024
d23f32b
bind source columns as projection
SkyFan2002 Mar 23, 2024
d6ab708
make DuplicateProcessor more general
SkyFan2002 Mar 23, 2024
bf8ff7a
add Duplicat plan
SkyFan2002 Mar 23, 2024
3859d87
add Shuffle plan
SkyFan2002 Mar 23, 2024
14b6cd1
add several physical plan
SkyFan2002 Mar 24, 2024
79f69b4
unconditional multi table insert pipeline
SkyFan2002 Mar 25, 2024
fae2213
add table id in CommitMeta
SkyFan2002 Mar 25, 2024
9e5bb2c
finish unconditional pipeline
SkyFan2002 Mar 25, 2024
891610d
lint
SkyFan2002 Mar 26, 2024
8c15684
clippy
SkyFan2002 Mar 26, 2024
6230f94
Merge remote-tracking branch 'upstream/main' into insert_first
SkyFan2002 Mar 26, 2024
83b017f
fix compile error
SkyFan2002 Mar 26, 2024
eeb407d
fix privilege check
SkyFan2002 Mar 26, 2024
6b6d7e1
fix typos
SkyFan2002 Mar 26, 2024
ce02210
Merge branch 'main' into insert_first
SkyFan2002 Mar 26, 2024
8311cc4
fix slt
SkyFan2002 Mar 26, 2024
dc1dbc3
add test
SkyFan2002 Mar 26, 2024
5e9a47b
fix compile error
SkyFan2002 Mar 26, 2024
4c9a107
conditional insert all
SkyFan2002 Mar 26, 2024
ea4374b
support insert first
SkyFan2002 Mar 26, 2024
fab07b5
support source columns
SkyFan2002 Mar 26, 2024
9eaab73
Merge branch 'main' into insert_first
SkyFan2002 Mar 26, 2024
397e631
Merge branch 'main' into insert_first
SkyFan2002 Mar 27, 2024
3c911f0
support cast schema
SkyFan2002 Mar 27, 2024
6c5199e
support fill default and reorder
SkyFan2002 Mar 27, 2024
cf6d297
support cluster key
SkyFan2002 Mar 27, 2024
0554f17
support multi stmt txn
SkyFan2002 Mar 27, 2024
08c497f
make lint
SkyFan2002 Mar 27, 2024
c844154
Merge remote-tracking branch 'upstream/main' into insert_first
SkyFan2002 Mar 27, 2024
ec461f7
fix cluster key
SkyFan2002 Mar 27, 2024
e7fd46c
support change tracking
SkyFan2002 Mar 28, 2024
830c1ca
fix filter index
SkyFan2002 Mar 28, 2024
a7e4a58
fix shuffle
SkyFan2002 Mar 28, 2024
b3c92bf
Merge remote-tracking branch 'upstream/main' into insert_first
SkyFan2002 Mar 28, 2024
6b1544f
Merge branch 'insert_first' of github.com:SkyFan2002/databend into in…
SkyFan2002 Mar 28, 2024
d27100f
fix compile err
SkyFan2002 Mar 28, 2024
fb38281
support same table in multi branch
SkyFan2002 Mar 28, 2024
bdd9270
fix insert first filter
SkyFan2002 Mar 28, 2024
71d4ac9
Merge branch 'main' into insert_first
SkyFan2002 Mar 29, 2024
e9f225a
Update src/query/pipeline/core/src/pipeline.rs
SkyFan2002 Mar 29, 2024
291cfc2
fix wrong test file name
SkyFan2002 Mar 29, 2024
fda7ee7
FIRST no need be a reserved ident
SkyFan2002 Mar 29, 2024
90368c1
fix typos
SkyFan2002 Mar 29, 2024
dd08659
rename symbol
SkyFan2002 Mar 29, 2024
9a6aef1
remove not universal shuffle strategy
SkyFan2002 Mar 29, 2024
8e0ff11
fix force finish together
SkyFan2002 Mar 29, 2024
bea77f6
peek first token first
SkyFan2002 Mar 29, 2024
39c44e4
Revert "remove not universal shuffle strategy"
SkyFan2002 Mar 29, 2024
a61a161
shuffle return Result
SkyFan2002 Mar 29, 2024
a14be8f
add comments
SkyFan2002 Mar 29, 2024
73c4b2b
make lint
SkyFan2002 Mar 29, 2024
16ab744
Merge branch 'main' into insert_first
SkyFan2002 Mar 29, 2024
b121799
fix compile error
SkyFan2002 Mar 29, 2024
b5bf709
split test file
SkyFan2002 Mar 29, 2024
d38f9a3
Merge branch 'main' into insert_first
BohuTANG Mar 29, 2024
5ce88c0
Merge branch 'main' into insert_first
BohuTANG Mar 29, 2024
14e277b
Merge branch 'main' into insert_first
dantengsky Mar 30, 2024
7dedc0f
Merge branch 'main' into insert_first
dantengsky Mar 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions src/query/ast/src/ast/statements/insert_multi_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;

use derive_visitor::Drive;
use derive_visitor::DriveMut;

use crate::ast::write_comma_separated_list;
use crate::ast::Expr;
use crate::ast::Identifier;
use crate::ast::Query;
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct IntoClause {
pub catalog: Option<Identifier>,
pub database: Option<Identifier>,
pub table: Identifier,
pub target_columns: Vec<Identifier>,
pub source_columns: Vec<Identifier>,
}

impl Display for IntoClause {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "INTO ")?;
if let Some(catalog) = &self.catalog {
write!(f, "{}.", catalog)?;
}
if let Some(database) = &self.database {
write!(f, "{}.", database)?;
}
write!(f, "{}", self.table)?;
if !self.target_columns.is_empty() {
write!(f, " (")?;
write_comma_separated_list(f, &self.target_columns)?;
write!(f, ")")?;
}
if !self.source_columns.is_empty() {
write!(f, " VALUES ")?;
write!(f, " (")?;
write_comma_separated_list(f, &self.source_columns)?;
write!(f, ")")?;
}
Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct WhenClause {
pub condition: Expr,
pub into_clauses: Vec<IntoClause>,
}

impl Display for WhenClause {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "WHEN ")?;
self.condition.fmt(f)?;
write!(f, " THEN ")?;
for into_clause in &self.into_clauses {
write!(f, "{} ", into_clause)?;
}
Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct ElseClause {
pub into_clauses: Vec<IntoClause>,
}

impl Display for ElseClause {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "ELSE ")?;
for into_clause in &self.into_clauses {
write!(f, "{} ", into_clause)?;
}
Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct InsertMultiTableStmt {
#[drive(skip)]
pub overwrite: bool,
#[drive(skip)]
pub is_first: bool,
pub when_clauses: Vec<WhenClause>,
pub else_clause: Option<ElseClause>,
pub into_clauses: Vec<IntoClause>,
pub source: Query,
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub enum InsertMultiTableKind {
First,
All,
}

impl Display for InsertMultiTableStmt {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "INSERT ")?;
if self.overwrite {
write!(f, "OVERWRITE ")?;
}
match &self.is_first {
true => write!(f, "FIRST ")?,
false => write!(f, "ALL ")?,
}
for when in &self.when_clauses {
write!(f, "{} ", when)?;
}
if let Some(else_clause) = &self.else_clause {
write!(f, "{} ", else_clause)?;
}
for into_clause in &self.into_clauses {
write!(f, "{} ", into_clause)?;
}
write!(f, "{}", self.source)
}
}
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod explain;
mod hint;
mod index;
mod insert;
mod insert_multi_table;
mod kill;
mod lock;
mod merge_into;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub use explain::*;
pub use hint::*;
pub use index::*;
pub use insert::*;
pub use insert_multi_table::*;
pub use kill::*;
pub use lock::*;
pub use merge_into::*;
Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub enum Statement {
},

Insert(InsertStmt),
InsertMultiTable(InsertMultiTableStmt),
Replace(ReplaceStmt),
MergeInto(MergeIntoStmt),
Delete(DeleteStmt),
Expand Down Expand Up @@ -415,6 +416,7 @@ impl Display for Statement {
}
Statement::Query(stmt) => write!(f, "{stmt}")?,
Statement::Insert(stmt) => write!(f, "{stmt}")?,
Statement::InsertMultiTable(insert_multi_table) => write!(f, "{insert_multi_table}")?,
Statement::Replace(stmt) => write!(f, "{stmt}")?,
Statement::MergeInto(stmt) => write!(f, "{stmt}")?,
Statement::Delete(stmt) => write!(f, "{stmt}")?,
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/ast/visitors/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,5 +567,6 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
Statement::Begin => {}
Statement::Commit => {}
Statement::Abort => {}
Statement::InsertMultiTable(_) => {}
}
}
1 change: 1 addition & 0 deletions src/query/ast/src/ast/visitors/walk_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,5 +565,6 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
Statement::AlterNotification(stmt) => visitor.visit_alter_notification(stmt),
Statement::DropNotification(stmt) => visitor.visit_drop_notification(stmt),
Statement::DescribeNotification(stmt) => visitor.visit_describe_notification(stmt),
Statement::InsertMultiTable(_) => {}
}
}
87 changes: 86 additions & 1 deletion src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2063,7 +2063,9 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
| #show_password_policies: "`SHOW PASSWORD POLICIES [<show_options>]`"
),
rule!(
#insert_stmt(false) : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
#conditional_multi_table_insert() : "`INSERT [OVERWRITE] {FIRST|ALL} { WHEN <condition> THEN intoClause [ ... ] } [ ... ] [ ELSE intoClause ] <subquery>`"
| #unconditional_multi_table_insert() : "`INSERT [OVERWRITE] ALL intoClause [ ... ] <subquery>`"
| #insert_stmt(false) : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
| #replace_stmt(false) : "`REPLACE INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
| #merge : "`MERGE INTO <target_table> USING <source> ON <join_expr> { matchedClause | notMatchedClause } [ ... ]`"
| #delete : "`DELETE FROM <table> [WHERE ...]`"
Expand Down Expand Up @@ -2290,6 +2292,89 @@ pub fn insert_stmt(allow_raw: bool) -> impl FnMut(Input) -> IResult<Statement> {
}
}

pub fn conditional_multi_table_insert() -> impl FnMut(Input) -> IResult<Statement> {
move |i| {
map(
rule! {
INSERT ~ OVERWRITE? ~ (FIRST | ALL) ~ (#when_clause)+ ~ (#else_clause)? ~ #query
},
|(_, overwrite, kind, when_clauses, opt_else, source)| {
Statement::InsertMultiTable(InsertMultiTableStmt {
overwrite: overwrite.is_some(),
is_first: matches!(kind.kind, FIRST),
when_clauses,
else_clause: opt_else,
into_clauses: vec![],
source,
})
},
)(i)
}
}

pub fn unconditional_multi_table_insert() -> impl FnMut(Input) -> IResult<Statement> {
move |i| {
map(
rule! {
INSERT ~ OVERWRITE? ~ ALL ~ (#into_clause)+ ~ #query
},
|(_, overwrite, _, into_clauses, source)| {
Statement::InsertMultiTable(InsertMultiTableStmt {
overwrite: overwrite.is_some(),
is_first: false,
when_clauses: vec![],
else_clause: None,
into_clauses,
source,
})
},
)(i)
}
}

fn when_clause(i: Input) -> IResult<WhenClause> {
map(
rule! {
WHEN ~ ^#expr ~ THEN ~ (#into_clause)+
},
|(_, expr, _, into_clauses)| WhenClause {
condition: expr,
into_clauses,
},
)(i)
}

fn into_clause(i: Input) -> IResult<IntoClause> {
map(
rule! {
INTO
~ #dot_separated_idents_1_to_3
~ ( "(" ~ #comma_separated_list1(ident) ~ ")" )?
~ (VALUES ~ "(" ~ #comma_separated_list1(ident) ~ ")" )?
},
|(_, (catalog, database, table), opt_target_columns, opt_source_columns)| IntoClause {
catalog,
database,
table,
target_columns: opt_target_columns
.map(|(_, columns, _)| columns)
.unwrap_or_default(),
source_columns: opt_source_columns
.map(|(_, _, columns, _)| columns)
.unwrap_or_default(),
},
)(i)
}

fn else_clause(i: Input) -> IResult<ElseClause> {
map(
rule! {
ELSE ~ (#into_clause)+
},
|(_, into_clauses)| ElseClause { into_clauses },
)(i)
}

pub fn replace_stmt(allow_raw: bool) -> impl FnMut(Input) -> IResult<Statement> {
move |i| {
let insert_source_parser = if allow_raw {
Expand Down
46 changes: 46 additions & 0 deletions src/query/ast/tests/it/display.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_ast::parser::parse_sql;
use databend_common_ast::parser::tokenize_sql;
use databend_common_ast::parser::Dialect;

fn test_stmt_display(sql: &str) {
let tokens = tokenize_sql(sql).unwrap();
let (stmt, _) = parse_sql(&tokens, Dialect::PostgreSQL).unwrap();
let sql1 = stmt.to_string();
let tokens1 = tokenize_sql(&sql1).unwrap();
let (stmt1, _) = parse_sql(&tokens1, Dialect::PostgreSQL).unwrap();
let sql2 = stmt1.to_string();
assert_eq!(sql1, sql2);
}

#[test]
fn test_multi_table_insert_display() {
const SQL_FILE_PATH: &str = "tests/it/testsql/multi_table_insert.sql";
let sqls = std::fs::read_to_string(SQL_FILE_PATH).unwrap();
for sql in sqls.split(';').filter(|s| !s.is_empty()) {
test_stmt_display(sql);
}
}

#[test]
fn test_multi_table_insert_parse_error() {
const SQL_FILE_PATH: &str = "tests/it/testsql/multi_table_insert_error.sql";
let sqls = std::fs::read_to_string(SQL_FILE_PATH).unwrap();
for sql in sqls.split(';').filter(|s| !s.is_empty()) {
let tokens = tokenize_sql(sql).unwrap();
assert!(parse_sql(&tokens, Dialect::PostgreSQL).is_err());
}
}
1 change: 1 addition & 0 deletions src/query/ast/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
#![allow(clippy::uninlined_format_args)]

mod decimal;
mod display;
mod parser;
mod token;
64 changes: 64 additions & 0 deletions src/query/ast/tests/it/testsql/multi_table_insert.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
INSERT FIRST
WHEN n1 > 100 THEN
INTO t1
WHEN n1 > 10 THEN
INTO t1
INTO t2
ELSE
INTO t2
SELECT n1 from src;

INSERT OVERWRITE FIRST
WHEN n1 > 100 THEN
INTO t1
WHEN n1 > 10 THEN
INTO t1
INTO t2
ELSE
INTO t2
SELECT n1 from src;

INSERT OVERWRITE ALL
WHEN n1 > 100 THEN
INTO t1
WHEN n1 > 10 THEN
INTO t1
INTO t2
ELSE
INTO t2
SELECT n1 from src;

INSERT OVERWRITE ALL
INTO t1
INTO t2
SELECT n1 from src;

INSERT OVERWRITE FIRST
WHEN n1 > 100 THEN
INTO t1
WHEN n1 > 10 THEN
INTO t1
INTO t2
SELECT n1 from src;

INSERT OVERWRITE ALL
WHEN n1 > 100 THEN
INTO t1
WHEN n1 > 10 THEN
INTO t1
INTO t2
SELECT n1 from src;

INSERT ALL
INTO t1
INTO t1 (c1, c2, c3) VALUES (n2, n1, n3)
INTO t2 (c1, c2, c3)
INTO t2 VALUES (n3, n2, n1)
SELECT n1, n2, n3 from src;

INSERT OVERWRITE ALL
INTO t1
INTO t1 (c1, c2, c3) VALUES (n2, n1, n3)
INTO t2 (c1, c2, c3)
INTO t2 VALUES (n3, n2, n1)
SELECT n1, n2, n3 from src;
Loading
Loading