From 93997947672298c7ad7a08a9c029ef882cc0ab6a Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Sat, 23 Mar 2024 01:43:56 -0700 Subject: [PATCH] fix(query): fix attach_query_str (#15066) * fix(query): fix attach_query_str * fix(query): fix attach_query_str * fix(query): fix attach_query_str * fix(query): fix attach_query_str * fix(query): fix attach_query_str * update * update --- src/query/catalog/src/query_kind.rs | 1 + .../flight_sql/flight_sql_service/query.rs | 11 +++++-- .../src/servers/http/clickhouse_handler.rs | 3 -- src/query/service/src/servers/http/v1/load.rs | 3 +- .../servers/http/v1/query/execute_state.rs | 3 +- .../servers/mysql/mysql_interactive_worker.rs | 4 +-- .../others/suggested_background_tasks.rs | 3 +- src/query/service/src/test_kits/config.rs | 1 + src/query/service/tests/it/api/http/status.rs | 4 +-- .../tests/it/sql/exec/get_table_bind_test.rs | 5 ++-- .../tests/it/sql/planner/builders/binder.rs | 28 ++++++++++++++++++ .../tests/it/sql/planner/builders/mod.rs | 1 + .../it/storages/fuse/operations/commit.rs | 4 +-- .../storages/testdata/configs_table_basic.txt | 2 +- src/query/sql/src/planner/mod.rs | 1 + src/query/sql/src/planner/planner.rs | 29 +++++++++++++++---- .../src/parquet_rs/parquet_table/table.rs | 5 +++- .../options/parquet_missing_field.test | 2 +- 18 files changed, 78 insertions(+), 32 deletions(-) create mode 100644 src/query/service/tests/it/sql/planner/builders/binder.rs diff --git a/src/query/catalog/src/query_kind.rs b/src/query/catalog/src/query_kind.rs index b2e27cd799d5..74e2aca44e84 100644 --- a/src/query/catalog/src/query_kind.rs +++ b/src/query/catalog/src/query_kind.rs @@ -24,6 +24,7 @@ pub enum QueryKind { Query, Explain, CopyIntoTable, + CopyIntoLocation, Update, Insert, Other, diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs index 9035dbb56cbb..c4564feec63e 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/query.rs @@ -26,6 +26,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::DataSchema; +use databend_common_sql::get_query_kind; use databend_common_sql::plans::Plan; use databend_common_sql::PlanExtras; use databend_common_sql::Planner; @@ -95,7 +96,10 @@ impl FlightSqlServiceImpl { .await .map_err(|e| status!("Could not create_query_context", e))?; - context.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql()); + context.attach_query_str( + get_query_kind(&plan_extras.statement), + plan_extras.statement.to_mask_sql(), + ); let interpreter = InterpreterFactory::get(context.clone(), plan).await?; let mut blocks = interpreter.execute(context.clone()).await?; @@ -120,7 +124,10 @@ impl FlightSqlServiceImpl { .await .map_err(|e| status!("Could not create_query_context", e))?; - context.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql()); + context.attach_query_str( + get_query_kind(&plan_extras.statement), + plan_extras.statement.to_mask_sql(), + ); let interpreter = InterpreterFactory::get(context.clone(), plan).await?; let data_schema = plan.schema(); diff --git a/src/query/service/src/servers/http/clickhouse_handler.rs b/src/query/service/src/servers/http/clickhouse_handler.rs index 44c1396dcc84..499143e5a42c 100644 --- a/src/query/service/src/servers/http/clickhouse_handler.rs +++ b/src/query/service/src/servers/http/clickhouse_handler.rs @@ -273,8 +273,6 @@ pub async fn clickhouse_handler_get( .map_err(|err| err.display_with_sql(&sql)) .map_err(BadRequest)?; let format = get_format_with_default(extras.format, default_format)?; - - context.attach_query_str(plan.kind(), extras.statement.to_mask_sql()); let interpreter = InterpreterFactory::get(context.clone(), &plan) .await .map_err(|err| err.display_with_sql(&sql)) @@ -357,7 +355,6 @@ pub async fn clickhouse_handler_post( .map_err(|err| err.display_with_sql(&sql)) .map_err(BadRequest)?; let schema = plan.schema(); - ctx.attach_query_str(plan.kind(), extras.statement.to_mask_sql()); let mut handle = None; if let Plan::Insert(insert) = &mut plan { if let InsertInputSource::StreamingWithFormat(format, start, input_context_ref) = diff --git a/src/query/service/src/servers/http/v1/load.rs b/src/query/service/src/servers/http/v1/load.rs index 6369f2dcd569..153d2825a170 100644 --- a/src/query/service/src/servers/http/v1/load.rs +++ b/src/query/service/src/servers/http/v1/load.rs @@ -135,12 +135,11 @@ pub async fn streaming_load( .map_err(InternalServerError)?; let mut planner = Planner::new(context.clone()); - let (mut plan, extras) = planner + let (mut plan, _) = planner .plan_sql(insert_sql) .await .map_err(|err| err.display_with_sql(insert_sql)) .map_err(InternalServerError)?; - context.attach_query_str(plan.kind(), extras.statement.to_mask_sql()); let schema = plan.schema(); match &mut plan { diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index e3c0292abeca..d647d5cba341 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -310,11 +310,10 @@ impl ExecuteState { let entry = QueryEntry::create(&ctx)?; let queue_guard = QueriesQueueManager::instance().acquire(entry).await?; - let (plan, plan_extras) = ExecuteState::plan_sql(&sql, ctx.clone()) + let (plan, _) = ExecuteState::plan_sql(&sql, ctx.clone()) .await .map_err(|err| err.display_with_sql(&sql))?; - ctx.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql()); { // set_var may change settings let mut guard = format_settings.write(); diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 6809164e853d..e7c18c71c1ff 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -359,9 +359,7 @@ impl InteractiveWorkerBase { let entry = QueryEntry::create(&context)?; let _guard = QueriesQueueManager::instance().acquire(entry).await?; let mut planner = Planner::new(context.clone()); - let (plan, extras) = planner.plan_sql(query).await?; - - context.attach_query_str(plan.kind(), extras.statement.to_mask_sql()); + let (plan, _) = planner.plan_sql(query).await?; let interpreter = InterpreterFactory::get(context.clone(), &plan).await; let has_result_set = plan.has_result_set(); diff --git a/src/query/service/src/table_functions/others/suggested_background_tasks.rs b/src/query/service/src/table_functions/others/suggested_background_tasks.rs index 64b0864c9660..b68a759b5895 100644 --- a/src/query/service/src/table_functions/others/suggested_background_tasks.rs +++ b/src/query/service/src/table_functions/others/suggested_background_tasks.rs @@ -164,8 +164,7 @@ impl SuggestedBackgroundTasksSource { sql: String, ) -> Result> { let mut planner = Planner::new(ctx.clone()); - let (plan, plan_extras) = planner.plan_sql(sql.as_str()).await?; - ctx.attach_query_str(plan.kind(), plan_extras.statement.to_mask_sql()); + let (plan, _) = planner.plan_sql(sql.as_str()).await?; let data_schema = plan.schema(); let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let stream = interpreter.execute(ctx.clone()).await?; diff --git a/src/query/service/src/test_kits/config.rs b/src/query/service/src/test_kits/config.rs index 3af1207828fd..a32e3d737aa1 100644 --- a/src/query/service/src/test_kits/config.rs +++ b/src/query/service/src/test_kits/config.rs @@ -44,6 +44,7 @@ impl ConfigBuilder { let tmp_dir = TempDir::new().expect("create tmp dir failed"); let root = tmp_dir.path().to_str().unwrap().to_string(); conf.storage.params = StorageParams::Fs(StorageFsConfig { root }); + conf.storage.allow_insecure = true; ConfigBuilder { conf } } diff --git a/src/query/service/tests/it/api/http/status.rs b/src/query/service/tests/it/api/http/status.rs index 363206fc5c79..09e84660e3f1 100644 --- a/src/query/service/tests/it/api/http/status.rs +++ b/src/query/service/tests/it/api/http/status.rs @@ -26,7 +26,6 @@ use databend_query::interpreters::InterpreterFactory; use databend_query::sessions::QueryContext; use databend_query::sessions::SessionManager; use databend_query::sessions::SessionType; -use databend_query::sessions::TableContext; use databend_query::sql::Planner; use databend_query::test_kits::*; use poem::get; @@ -69,8 +68,7 @@ async fn run_query(query_ctx: &Arc) -> Result .set_authed_user(user, None) .await?; let mut planner = Planner::new(query_ctx.clone()); - let (plan, extras) = planner.plan_sql(sql).await?; - query_ctx.attach_query_str(plan.kind(), extras.statement.to_mask_sql()); + let (plan, _) = planner.plan_sql(sql).await?; InterpreterFactory::get(query_ctx.clone(), &plan).await } diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index b6b99f532b21..141f9aa3eb8a 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -522,9 +522,7 @@ impl TableContext for CtxDelegation { todo!() } - fn attach_query_str(&self, _kind: QueryKind, _query: String) { - todo!() - } + fn attach_query_str(&self, _kind: QueryKind, _query: String) {} fn get_query_str(&self) -> String { todo!() @@ -854,6 +852,7 @@ async fn test_get_same_table_once() -> Result<()> { let mut planner = Planner::new(ctx.clone()); let (_, _) = planner.plan_sql(query.as_str()).await?; + assert_eq!( ctx.table_without_cache .load(std::sync::atomic::Ordering::SeqCst), diff --git a/src/query/service/tests/it/sql/planner/builders/binder.rs b/src/query/service/tests/it/sql/planner/builders/binder.rs new file mode 100644 index 000000000000..a3829436c27a --- /dev/null +++ b/src/query/service/tests/it/sql/planner/builders/binder.rs @@ -0,0 +1,28 @@ +use databend_common_catalog::query_kind::QueryKind; +use databend_common_exception::Result; +use databend_common_sql::Planner; +use databend_common_storages_fuse::TableContext; +use databend_query::sessions::SessionType; +use databend_query::test_kits::TestFixture; + +#[tokio::test(flavor = "multi_thread")] +async fn test_query_kind() -> Result<()> { + let fixture = TestFixture::setup().await?; + fixture.create_default_database().await?; + fixture.create_default_table().await?; + + let http_session = fixture + .new_session_with_type(SessionType::HTTPQuery) + .await?; + let ctx = http_session.create_query_context().await?; + let mut planner = Planner::new(ctx.clone()); + let sql = format!( + "COPY INTO {}.{} from @~/ pattern='.*' FILE_FORMAT = (TYPE = 'csv') PURGE=true FORCE=true max_files=10000;", + fixture.default_db_name(), + fixture.default_table_name() + ); + let (_, _) = planner.plan_sql(&sql).await?; + let kind = ctx.get_query_kind(); + assert_eq!(kind, QueryKind::CopyIntoTable); + Ok(()) +} diff --git a/src/query/service/tests/it/sql/planner/builders/mod.rs b/src/query/service/tests/it/sql/planner/builders/mod.rs index 2a94d6a6251a..f7a62d3c019d 100644 --- a/src/query/service/tests/it/sql/planner/builders/mod.rs +++ b/src/query/service/tests/it/sql/planner/builders/mod.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod binder; mod select_builder; diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 71d7227642c3..342890f4cd69 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -480,9 +480,7 @@ impl TableContext for CtxDelegation { todo!() } - fn attach_query_str(&self, _kind: QueryKind, _query: String) { - todo!() - } + fn attach_query_str(&self, _kind: QueryKind, _query: String) {} fn get_query_str(&self) -> String { todo!() diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index d5ef772e3c76..fe77d59e8f03 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -131,7 +131,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'query' | 'tenant_id' | 'test' | '' | | 'query' | 'udf_server_allow_list' | '' | '' | | 'query' | 'users' | '{"name":"root","auth_type":"no_password","auth_string":null}' | '' | -| 'storage' | 'allow_insecure' | 'false' | '' | +| 'storage' | 'allow_insecure' | 'true' | '' | | 'storage' | 'azblob.account_key' | '' | '' | | 'storage' | 'azblob.account_name' | '' | '' | | 'storage' | 'azblob.container' | '' | '' | diff --git a/src/query/sql/src/planner/mod.rs b/src/query/sql/src/planner/mod.rs index 1e6ff878462f..3380ecc260bb 100644 --- a/src/query/sql/src/planner/mod.rs +++ b/src/query/sql/src/planner/mod.rs @@ -39,6 +39,7 @@ pub use bloom_index::BloomIndexColumns; pub use expression_parser::*; pub use format::format_scalar; pub use metadata::*; +pub use planner::get_query_kind; pub use planner::PlanExtras; pub use planner::Planner; pub use plans::insert::InsertInputSource; diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index 3510116c57ab..9f294fa18925 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -136,12 +136,6 @@ impl Planner { return Err(ErrorCode::SyntaxException("convert prql to sql failed.")); } - if matches!(stmt, Statement::CopyIntoLocation(_)) { - // Indicate binder there is no need to collect column statistics for the binding table. - self.ctx - .attach_query_str(QueryKind::CopyIntoTable, String::new()); - } - self.replace_stmt(&mut stmt, sql_dialect); // Step 3: Bind AST with catalog, and generate a pure logical SExpr @@ -153,7 +147,14 @@ impl Planner { name_resolution_ctx, metadata.clone(), ); + + // Indicate binder there is no need to collect column statistics for the binding table. + self.ctx + .attach_query_str(get_query_kind(&stmt), stmt.to_mask_sql()); let plan = binder.bind(&stmt).await?; + // attach again to avoid the query kind is overwritten by the subquery + self.ctx + .attach_query_str(get_query_kind(&stmt), stmt.to_mask_sql()); // Step 4: Optimize the SExpr with optimizers, and generate optimized physical SExpr let opt_ctx = OptimizerContext::new(self.ctx.clone(), metadata.clone()) @@ -236,3 +237,19 @@ impl Planner { self.add_max_rows_limit(stmt); } } + +pub fn get_query_kind(stmt: &Statement) -> QueryKind { + match stmt { + Statement::Query { .. } => QueryKind::Query, + Statement::CopyIntoTable(_) => QueryKind::CopyIntoTable, + Statement::CopyIntoLocation(_) => QueryKind::CopyIntoLocation, + Statement::Explain { .. } => QueryKind::Explain, + Statement::Insert(_) => QueryKind::Insert, + Statement::Replace(_) + | Statement::Delete(_) + | Statement::MergeInto(_) + | Statement::OptimizeTable(_) + | Statement::Update(_) => QueryKind::Update, + _ => QueryKind::Other, + } +} diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs index ac5912431bdc..8328fa881969 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs @@ -136,7 +136,10 @@ impl ParquetRSTable { // If the query is `COPY`, we don't need to collect column statistics. // It's because the only transform could be contained in `COPY` command is projection. - let need_stats_provider = !matches!(ctx.get_query_kind(), QueryKind::CopyIntoTable); + let need_stats_provider = !matches!( + ctx.get_query_kind(), + QueryKind::CopyIntoTable | QueryKind::CopyIntoLocation + ); let settings = ctx.get_settings(); let max_threads = settings.get_max_threads()? as usize; let max_memory_usage = settings.get_max_memory_usage()?; diff --git a/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_field.test b/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_field.test index bf8078f50ec3..522f9167dc50 100644 --- a/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_field.test +++ b/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_field.test @@ -19,7 +19,7 @@ c5 BIGINT 1 1 c2 BIGINT 1 2 c4 VARCHAR 1 3 -query error get diff schema +query error copy into t1 from @data/parquet/diff_schema/ file_format=(type=parquet) pattern='.*[.]parquet' query