-
Notifications
You must be signed in to change notification settings - Fork 759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISSUE-2456: Support insert select query #2765
Conversation
Thanks for the contribution! Please review the labels and make any necessary changes. |
Codecov Report
@@ Coverage Diff @@
## main #2765 +/- ##
======================================
Coverage 67% 67%
======================================
Files 574 585 +11
Lines 31149 31342 +193
======================================
+ Hits 21041 21232 +191
- Misses 10108 10110 +2
Continue to review full report at Codecov.
|
I think we should avoid having stream inside the plan, after #2786 . |
159df1f
to
ade6f26
Compare
fn cast_function(output_type: &DataType) -> Result<Box<dyn Function>> { | ||
let function_factory = FunctionFactory::instance(); | ||
match output_type { | ||
DataType::Null => function_factory.get("toNull"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use CastFunction::create
?
let iter = output_schema.fields().iter().zip(select_schema.fields()); | ||
let mut colunm_vec = vec![]; | ||
for (i, (output_field, input_field)) in iter.enumerate() { | ||
let func = cast_function(output_field.data_type())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can keep funcs
out of the stream iter, because trait Function is Send + Sync.
} | ||
|
||
let select_input_stream = select_executor.execute(None).await?; | ||
let cast_input_stream = select_input_stream.map(move |data_block| match data_block { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to have a struct named CastStream
in common/streams folder.
@ygf11 hello, is this pr review for review? |
Wait a moment, I will push it soon! |
730bffb
to
d459570
Compare
@@ -52,7 +52,25 @@ impl InterpreterFactory { | |||
PlanNode::TruncateTable(v) => TruncateTableInterpreter::try_create(ctx, v), | |||
PlanNode::UseDatabase(v) => UseDatabaseInterpreter::try_create(ctx, v), | |||
PlanNode::SetVariable(v) => SettingInterpreter::try_create(ctx, v), | |||
PlanNode::InsertInto(v) => InsertIntoInterpreter::try_create(ctx, v), | |||
PlanNode::InsertInto(v) => { | |||
let select = match v.select_plan.clone().take() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job 👍
It would be nice to check and build select plan in the InsertIntoInterpreter, it looks more cleaner
common-datablocks = {path = "../datablocks"} | ||
common-datavalues = {path = "../datavalues"} | ||
common-exception = {path = "../exception"} | ||
common-functions = {path = "../functions"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's Clion's auto-reorder? I use vscode fail to find this format command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in vscode too, the command name is Format Cell:)
I found the order in package section, is always name->version->description..., should I recover this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's better to have alpha orders.
use crate::sql::*; | ||
|
||
#[tokio::test] | ||
async fn test_insert_into_select_interpreter() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great!
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need a new line.
DROP TABLE t1; | ||
DROP TABLE t2; | ||
DROP TABLE t3; | ||
DROP DATABASE db1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost LGTM.
BTW, if we want parallel insert from the multiple select stream, it's better to add some options to ignore last merge_processor in pipeline.rs. It can be addressed in another pr.
pub async fn execute(&mut self, parallel_insert: bool) -> Result<SendableDataBlockStream> {
if self.last_pipe()?.nums() > 1 && !parallel_insert {
self.merge_processor()?;
}
self.last_pipe()?.first().execute().await
}
Wait for another reviewer approval |
/lgtm Awesome work, thank you @ygf11 |
CI Passed |
I will do this in another pr. |
I hereby agree to the terms of the CLA available at: https://databend.rs/policies/cla/
Summary
support insert select query.
Changelog
Related Issues
Fixes #2456
Test Plan
Unit Tests
Stateless Tests