Skip to content

Commit

Permalink
fix: clickhouse worker hang when interpreter fail to execute
Browse files Browse the repository at this point in the history
  • Loading branch information
johnyzhou committed Apr 29, 2022
1 parent b86e963 commit c2e59a4
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_use_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Interpreter for UseDatabaseInterpreter {
&self,
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
if self.plan.db.is_empty() {
if self.plan.db.trim().is_empty() {
return Err(ErrorCode::UnknownDatabase("No database selected"));
}
self.ctx.set_current_database(self.plan.db.clone()).await?;
Expand Down
39 changes: 24 additions & 15 deletions query/src/servers/clickhouse/interactive_worker_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::ToErrorCode;
use common_planners::InsertPlan;
use common_planners::PlanNode;
use common_tracing::tracing;
Expand Down Expand Up @@ -208,30 +209,38 @@ impl InteractiveWorkerBase {
}
});

ctx.try_spawn(async move {
let query_result = ctx.try_spawn(async move {
// Query log start.
let _ = interpreter
.start()
.await
.map_err(|e| tracing::error!("interpreter.start.error: {:?}", e));

// Execute and read stream data.
let async_data_stream = interpreter.execute(None);
let mut data_stream = async_data_stream.await?;
while let Some(block) = data_stream.next().await {
data_tx.send(BlockItem::Block(block)).await.ok();
match interpreter.execute(None).await {
Err(e) => {
cancel_clone.store(true, Ordering::Relaxed);
Err(e)
}
Ok(mut data_stream) => {
while let Some(block) = data_stream.next().await {
data_tx.send(BlockItem::Block(block)).await.ok();
}
let _ = interpreter
.finish()
.await
.map_err(|e| tracing::error!("interpreter.finish.error: {:?}", e));
cancel_clone.store(true, Ordering::Relaxed);
Ok::<(), ErrorCode>(())
}
}
cancel_clone.store(true, Ordering::Relaxed);

// Query log finish.
let _ = interpreter
.finish()
.await
.map_err(|e| tracing::error!("interpreter.finish.error: {:?}", e));
Ok::<(), ErrorCode>(())
})?;

Ok(rx)
let query_result = query_result
.await
.map_err_to_code(ErrorCode::TokioError, || {
"Cannot join handle from context's runtime"
})?;
query_result.map(|_| rx)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ERROR 1105 (HY000) at line 1: Code: 1003, displayText = Cannot USE 'not_exists_db', because the 'not_exists_db' doesn't exist.
ERROR 1105 (HY000) at line 2: Code: 1003, displayText = No database selected.
ERROR 1105 (HY000) at line 3: Code: 1003, displayText = No database selected.
system
1 change: 1 addition & 0 deletions tests/suites/0_stateless/07_use/07_0000_use_database.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-- {ErrorCode 1003, but it not work, because it's trimed in opensrv-mysql}
USE not_exists_db;
USE ``;
USE ` `;
USE default;
USE system;
select database();
Expand Down

0 comments on commit c2e59a4

Please sign in to comment.