Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy command to load local file into table [patch-1] #2682

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions common/planners/src/plan_insert_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_exception::Result;
use common_infallible::Mutex;
use common_meta_types::MetaId;

type BlockStream =
std::pin::Pin<Box<dyn futures::stream::Stream<Item = DataBlock> + Sync + Send + 'static>>;
type BlockStream = std::pin::Pin<
Box<dyn futures::stream::Stream<Item = Result<DataBlock>> + Sync + Send + 'static>,
>;

#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct InsertIntoPlan {
Expand Down
2 changes: 2 additions & 0 deletions query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ sha2 = "0.9.8"
structopt = "0.3"
structopt-toml = "0.5.0"
threadpool = "1.8.1"

tokio = { version = "1.13.0", features = ["fs"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use common/base/runtime?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A mistake made.

tokio-rustls = "0.22.0"
tokio-stream = { version = "0.1", features = ["net"] }
toml = "0.5.8"
Expand Down
2 changes: 1 addition & 1 deletion query/src/configs/config_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl fmt::Debug for AzureStorageBlobConfig {
Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, StructOpt, StructOptToml,
)]
pub struct StorageConfig {
#[structopt(long, env = STORAGE_TYPE, default_value = "", help = "Current storage type: disk|s3")]
#[structopt(long, env = STORAGE_TYPE, default_value = "disk", help = "Current storage type: disk|s3")]
#[serde(default)]
pub storage_type: String,

Expand Down
4 changes: 2 additions & 2 deletions query/src/datasources/table/fuse/index/min_max_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ async fn test_min_max_index() -> Result<()> {
let blocks = (0..num)
.into_iter()
.map(|idx| {
DataBlock::create_by_array(test_schema.clone(), vec![
Ok(DataBlock::create_by_array(test_schema.clone(), vec![
Series::new(vec![idx + 1, idx + 2, idx + 3]),
Series::new(vec![idx * num + 1, idx * num + 2, idx * num + 3]),
])
]))
})
.collect::<Vec<_>>();

Expand Down
6 changes: 4 additions & 2 deletions query/src/datasources/table/fuse/io/block_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use crate::datasources::table::fuse::util;
use crate::datasources::table::fuse::SegmentInfo;
use crate::datasources::table::fuse::Stats;

pub type BlockStream =
std::pin::Pin<Box<dyn futures::stream::Stream<Item = DataBlock> + Sync + Send + 'static>>;
pub type BlockStream = std::pin::Pin<
Box<dyn futures::stream::Stream<Item = Result<DataBlock>> + Sync + Send + 'static>,
>;

/// dummy struct, namespace placeholder
pub struct BlockAppender;
Expand All @@ -48,6 +49,7 @@ impl BlockAppender {

// accumulate the stats and save the blocks
while let Some(block) = stream.next().await {
let block = block?;
stats_acc.acc(&block)?;
let schema = block.schema().to_arrow();
let location = util::gen_unique_block_location();
Expand Down
2 changes: 1 addition & 1 deletion query/src/datasources/table/fuse/io/block_appender_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn test_fuse_table_block_appender() {
let local_fs = common_dal::Local::with_path(tmp_dir.path().to_owned());
let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]);
let block = DataBlock::create_by_array(schema.clone(), vec![Series::new(vec![1, 2, 3])]);
let block_stream = futures::stream::iter(vec![block]);
let block_stream = futures::stream::iter(vec![Ok(block)]);
let r =
BlockAppender::append_blocks(Arc::new(local_fs), Box::pin(block_stream), schema.as_ref())
.await;
Expand Down
7 changes: 5 additions & 2 deletions query/src/datasources/table/fuse/table_test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_datavalues::DataField;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataSchemaRefExt;
use common_datavalues::DataType;
use common_exception::Result;
use common_infallible::Mutex;
use common_meta_types::TableMeta;
use common_planners::CreateDatabasePlan;
Expand Down Expand Up @@ -109,13 +110,15 @@ impl TestFixture {
}
}

pub fn gen_block_stream(num: u32) -> Vec<DataBlock> {
pub fn gen_block_stream(num: u32) -> Vec<Result<DataBlock>> {
(0..num)
.into_iter()
.map(|_v| {
let schema =
DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]);
DataBlock::create_by_array(schema, vec![Series::new(vec![1, 2, 3])])
Ok(DataBlock::create_by_array(schema, vec![Series::new(vec![
1, 2, 3,
])]))
})
.collect()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {
let schema = DataSchemaRefExt::create(vec![DataField::new("a", DataType::Int32, false)]);
let col_stats = blocks
.iter()
.map(statistic_helper::block_stats)
.map(|block| statistic_helper::block_stats(&block.clone().unwrap()))
.collect::<common_exception::Result<Vec<_>>>()?;
let r = statistic_helper::column_stats_reduce_with_schema(&col_stats, &schema);
assert!(r.is_ok());
Expand All @@ -61,7 +61,7 @@ fn test_ft_stats_accumulator() -> common_exception::Result<()> {
let mut stats_acc = statistic_helper::StatisticsAccumulator::new();
let mut meta_acc = statistic_helper::BlockMetaAccumulator::new();
blocks.iter().try_for_each(|item| {
stats_acc.acc(item)?;
stats_acc.acc(&item.clone().unwrap())?;
meta_acc.acc(1, "".to_owned(), &mut stats_acc);
Ok::<_, ErrorCode>(())
})?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/datasources/table/memory/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Table for MemoryTable {

while let Some(block) = s.next().await {
let mut blocks = self.blocks.write();
blocks.push(block);
blocks.push(block?);
}
Ok(())
}
Expand Down
12 changes: 6 additions & 6 deletions query/src/datasources/table/memory/memory_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ async fn test_memorytable() -> Result<()> {

// append data.
{
let block = DataBlock::create_by_array(schema.clone(), vec![
let block = Ok(DataBlock::create_by_array(schema.clone(), vec![
Series::new(vec![1u64, 2]),
Series::new(vec![11u64, 22]),
]);
let block2 = DataBlock::create_by_array(schema.clone(), vec![
]));
let block2 = Ok(DataBlock::create_by_array(schema.clone(), vec![
Series::new(vec![4u64, 3]),
Series::new(vec![33u64, 33]),
]);
let blocks = vec![block, block2];
]));
let blocks: Vec<Result<DataBlock>> = vec![block, block2];

let input_stream = futures::stream::iter::<Vec<DataBlock>>(blocks.clone());
let input_stream = futures::stream::iter::<Vec<Result<DataBlock>>>(blocks.clone());
let insert_plan = InsertIntoPlan {
db_name: "default".to_string(),
tbl_name: "a".to_string(),
Expand Down
1 change: 1 addition & 0 deletions query/src/datasources/table/null/null_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl Table for NullTable {
.ok_or_else(|| ErrorCode::EmptyData("input stream consumed"))?;

while let Some(block) = s.next().await {
let block = block?;
info!("Ignore one block rows: {}", block.num_rows())
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions query/src/datasources/table/null/null_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ async fn test_null_table() -> Result<()> {
Series::new(vec![1u64, 2]),
Series::new(vec![11u64, 22]),
]);
let blocks = vec![block];
let blocks = vec![Ok(block)];

let input_stream = futures::stream::iter::<Vec<DataBlock>>(blocks.clone());
let input_stream = futures::stream::iter::<Vec<Result<DataBlock>>>(blocks.clone());
let insert_plan = InsertIntoPlan {
db_name: "default".to_string(),
tbl_name: "a".to_string(),
Expand Down
13 changes: 2 additions & 11 deletions query/src/servers/clickhouse/interactive_worker_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub struct FromClickHouseBlockStream {
}

impl futures::stream::Stream for FromClickHouseBlockStream {
type Item = DataBlock;
type Item = Result<DataBlock>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand All @@ -161,16 +161,7 @@ impl futures::stream::Stream for FromClickHouseBlockStream {
self.input.poll_next_unpin(cx).map(|x| match x {
Some(v) => {
let block = from_clickhouse_block(self.schema.clone(), v);
match block {
Ok(block) => Some(block),
Err(e) => {
log::error!(
"failed to convert ClickHouseBlock to block , breaking out, {:?}",
e
);
None
}
}
Some(block)
}
_ => None,
})
Expand Down
36 changes: 32 additions & 4 deletions query/src/sql/plan_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ use common_planners::ShowCreateTablePlan;
use common_planners::TruncateTablePlan;
use common_planners::UseDatabasePlan;
use common_planners::VarValue;
use common_streams::CsvSource;
use common_streams::Source;
use common_streams::SourceStream;
use common_streams::ValueSource;
use common_tracing::tracing;
use nom::FindSubstring;
Expand All @@ -69,6 +71,7 @@ use crate::catalogs::Catalog;
use crate::catalogs::ToReadDataSourcePlan;
use crate::functions::ContextFunction;
use crate::sessions::DatabendQueryContextRef;
use crate::sql::sql_statement::DfCopy;
use crate::sql::sql_statement::DfCreateTable;
use crate::sql::sql_statement::DfDropDatabase;
use crate::sql::sql_statement::DfUseDatabase;
Expand Down Expand Up @@ -175,6 +178,7 @@ impl PlanParser {
DfStatement::KillQuery(v) => self.sql_kill_query_to_plan(v),
DfStatement::KillConn(v) => self.sql_kill_connection_to_plan(v),
DfStatement::CreateUser(v) => self.sql_create_user_to_plan(v),
DfStatement::Copy(v) => self.copy_to_plan(v),
DfStatement::ShowUsers(_) => {
self.build_from_sql("SELECT * FROM system.users ORDER BY name")
}
Expand Down Expand Up @@ -440,6 +444,27 @@ impl PlanParser {
Ok(PlanNode::TruncateTable(TruncateTablePlan { db, table }))
}

// we can transform copy plan into insert plan
#[tracing::instrument(level = "info", skip(self, copy_stmt), fields(ctx.id = self.ctx.get_id().as_str()))]
pub fn copy_to_plan(&self, copy_stmt: &DfCopy) -> Result<PlanNode> {
let insert_plan = self.insert_to_plan(&copy_stmt.name, &copy_stmt.columns, &None, "")?;
match insert_plan {
PlanNode::InsertInto(v) => match copy_stmt.format.to_uppercase().as_str() {
"CSV" => {
// TODO: support s3/hdfs/oss/azure
let file = std::fs::File::open(copy_stmt.location.as_str())?;
let block_size = self.ctx.get_settings().get_max_block_size()?;
let source = CsvSource::new(file, v.schema(), block_size as usize);
v.set_input_stream(Box::pin(SourceStream::create(Box::new(source))));

Ok(PlanNode::InsertInto(v))
}
_ => unimplemented!("Only support copy csv file into table for now"),
},
_ => unreachable!(),
}
}

#[tracing::instrument(level = "info", skip(self, table_name, columns, source), fields(ctx.id = self.ctx.get_id().as_str()))]
fn insert_to_plan(
&self,
Expand Down Expand Up @@ -469,7 +494,7 @@ impl PlanParser {
schema = DataSchemaRefExt::create(fields);
}

let mut input_stream = futures::stream::iter::<Vec<DataBlock>>(vec![]);
let mut input_stream = futures::stream::iter::<Vec<Result<DataBlock>>>(vec![]);

if let Some(source) = source {
if let sqlparser::ast::SetExpr::Values(_vs) = &source.body {
Expand All @@ -481,10 +506,13 @@ impl PlanParser {
let mut source = ValueSource::new(values.as_bytes(), schema.clone(), block_size);
let mut blocks = vec![];
loop {
let block = source.read()?;
let block = source.read();
match block {
Some(b) => blocks.push(b),
None => break,
Ok(Some(block)) => {
blocks.push(Ok(block));
}
Err(e) => blocks.push(Err(e)),
Ok(None) => break,
}
}
input_stream = futures::stream::iter(blocks);
Expand Down
26 changes: 25 additions & 1 deletion query/src/sql/sql_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ use sqlparser::ast::Value;
use sqlparser::dialect::keywords::Keyword;
use sqlparser::dialect::Dialect;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::IsOptional;
use sqlparser::parser::Parser;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::Token;
use sqlparser::tokenizer::Tokenizer;
use sqlparser::tokenizer::Whitespace;

use super::DfCopy;
use crate::sql::DfCreateDatabase;
use crate::sql::DfCreateTable;
use crate::sql::DfCreateUser;
Expand Down Expand Up @@ -217,6 +219,10 @@ impl<'a> DfParser<'a> {
"KILL" => self.parse_kill_query(),
_ => self.expected("Keyword", self.parser.peek_token()),
},
Keyword::COPY => {
self.parser.next_token();
self.parse_copy()
}
_ => {
// use the native parser
Ok(DfStatement::Statement(self.parser.parse_statement()?))
Expand Down Expand Up @@ -572,7 +578,7 @@ impl<'a> DfParser<'a> {
fn parse_table_engine(&mut self) -> Result<String, ParserError> {
// TODO make ENGINE as a keyword
if !self.consume_token("ENGINE") {
return Ok("NULL".to_string());
return Ok("FUSE".to_string());
}

self.parser.expect_token(&Token::Eq)?;
Expand Down Expand Up @@ -608,6 +614,24 @@ impl<'a> DfParser<'a> {
}
}

fn parse_copy(&mut self) -> Result<DfStatement, ParserError> {
let name = self.parser.parse_object_name()?;
let columns = self
.parser
.parse_parenthesized_column_list(IsOptional::Optional)?;
self.parser.expect_keyword(Keyword::FROM)?;
let location = self.parser.parse_literal_string()?;
self.parser.expect_keyword(Keyword::FORMAT)?;
let format = self.parser.next_token().to_string();

Ok(DfStatement::Copy(DfCopy {
name,
columns,
location,
format,
}))
}

fn consume_token(&mut self, expected: &str) -> bool {
if self.parser.peek_token().to_string().to_uppercase() == *expected.to_uppercase() {
self.parser.next_token();
Expand Down
20 changes: 20 additions & 0 deletions query/src/sql/sql_parser_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,25 @@ fn hint_test() -> Result<()> {
Ok(())
}

#[test]
fn copy_test() -> Result<()> {
let ident = Ident::new("test_csv");
let v = vec![ident];
let name = ObjectName(v);

expect_parse_ok(
"copy test_csv from 'tests/data/sample.csv' format csv;",
DfStatement::Copy(DfCopy {
name,
columns: vec![],
location: "tests/data/sample.csv".to_string(),
format: "csv".to_string(),
}),
)?;

Ok(())
}

#[test]
fn show_databases_test() -> Result<()> {
expect_parse_ok(
Expand Down Expand Up @@ -503,5 +522,6 @@ fn create_user_test() -> Result<()> {
"CREATE USER 'test'@'localhost' IDENTIFIED WITH sha256_password BY ''",
String::from("sql parser error: Missing password"),
)?;

Ok(())
}
Loading