From c2e59a4f9ec8512bb7823ba22d723d165c5607e6 Mon Sep 17 00:00:00 2001 From: johnyzhou Date: Fri, 29 Apr 2022 10:39:30 +0800 Subject: [PATCH] fix: clickhouse worker hang when interpreter fail to execute --- .../interpreters/interpreter_use_database.rs | 2 +- .../clickhouse/interactive_worker_base.rs | 39 ++++++++++++------- .../07_use/07_0000_use_database.result | 1 + .../07_use/07_0000_use_database.sql | 1 + 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/query/src/interpreters/interpreter_use_database.rs b/query/src/interpreters/interpreter_use_database.rs index e67d0f31e056..8e992a93ad49 100644 --- a/query/src/interpreters/interpreter_use_database.rs +++ b/query/src/interpreters/interpreter_use_database.rs @@ -46,7 +46,7 @@ impl Interpreter for UseDatabaseInterpreter { &self, _input_stream: Option, ) -> Result { - if self.plan.db.is_empty() { + if self.plan.db.trim().is_empty() { return Err(ErrorCode::UnknownDatabase("No database selected")); } self.ctx.set_current_database(self.plan.db.clone()).await?; diff --git a/query/src/servers/clickhouse/interactive_worker_base.rs b/query/src/servers/clickhouse/interactive_worker_base.rs index ae736240760e..2363f379e1b5 100644 --- a/query/src/servers/clickhouse/interactive_worker_base.rs +++ b/query/src/servers/clickhouse/interactive_worker_base.rs @@ -27,6 +27,7 @@ use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; +use common_exception::ToErrorCode; use common_planners::InsertPlan; use common_planners::PlanNode; use common_tracing::tracing; @@ -208,7 +209,7 @@ impl InteractiveWorkerBase { } }); - ctx.try_spawn(async move { + let query_result = ctx.try_spawn(async move { // Query log start. let _ = interpreter .start() @@ -216,22 +217,30 @@ impl InteractiveWorkerBase { .map_err(|e| tracing::error!("interpreter.start.error: {:?}", e)); // Execute and read stream data. - let async_data_stream = interpreter.execute(None); - let mut data_stream = async_data_stream.await?; - while let Some(block) = data_stream.next().await { - data_tx.send(BlockItem::Block(block)).await.ok(); + match interpreter.execute(None).await { + Err(e) => { + cancel_clone.store(true, Ordering::Relaxed); + Err(e) + } + Ok(mut data_stream) => { + while let Some(block) = data_stream.next().await { + data_tx.send(BlockItem::Block(block)).await.ok(); + } + let _ = interpreter + .finish() + .await + .map_err(|e| tracing::error!("interpreter.finish.error: {:?}", e)); + cancel_clone.store(true, Ordering::Relaxed); + Ok::<(), ErrorCode>(()) + } } - cancel_clone.store(true, Ordering::Relaxed); - - // Query log finish. - let _ = interpreter - .finish() - .await - .map_err(|e| tracing::error!("interpreter.finish.error: {:?}", e)); - Ok::<(), ErrorCode>(()) })?; - - Ok(rx) + let query_result = query_result + .await + .map_err_to_code(ErrorCode::TokioError, || { + "Cannot join handle from context's runtime" + })?; + query_result.map(|_| rx) } } diff --git a/tests/suites/0_stateless/07_use/07_0000_use_database.result b/tests/suites/0_stateless/07_use/07_0000_use_database.result index 64c3645f2aac..07574c2eff50 100644 --- a/tests/suites/0_stateless/07_use/07_0000_use_database.result +++ b/tests/suites/0_stateless/07_use/07_0000_use_database.result @@ -1,3 +1,4 @@ ERROR 1105 (HY000) at line 1: Code: 1003, displayText = Cannot USE 'not_exists_db', because the 'not_exists_db' doesn't exist. ERROR 1105 (HY000) at line 2: Code: 1003, displayText = No database selected. +ERROR 1105 (HY000) at line 3: Code: 1003, displayText = No database selected. system diff --git a/tests/suites/0_stateless/07_use/07_0000_use_database.sql b/tests/suites/0_stateless/07_use/07_0000_use_database.sql index c60551a11cc7..84596a17c52f 100644 --- a/tests/suites/0_stateless/07_use/07_0000_use_database.sql +++ b/tests/suites/0_stateless/07_use/07_0000_use_database.sql @@ -1,6 +1,7 @@ -- {ErrorCode 1003, but it not work, because it's trimed in opensrv-mysql} USE not_exists_db; USE ``; +USE ` `; USE default; USE system; select database();