From 1af867780f83237c70c8faa5168e1dc48c824b4e Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Thu, 21 Jul 2022 14:17:09 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #5433 Signed-off-by: ti-chi-bot --- .../DataStreams/ParallelAggregatingBlockInputStream.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index bf41084e247..ae8dd5d1329 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -182,10 +182,19 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish() void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num) { parent.exceptions[thread_num] = exception; +<<<<<<< HEAD /// can not cancel parent inputStream or the exception might be lost if (!parent.executed) /// kill the processor so ExchangeReceiver will be closed parent.processor.cancel(true); +======= + Int32 old_value = -1; + parent.first_exception_index.compare_exchange_strong(old_value, static_cast(thread_num), std::memory_order_seq_cst, std::memory_order_relaxed); + + if (!parent.executed) + /// use cancel instead of kill to avoid too many useless error message + parent.cancel(false); +>>>>>>> 7b280755ba (fix a panic issue in parallel agg when exception is thrown (#5433)) } From 0ec35c8e7e732c45751766bc443a3b8176ff7290 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 21 Jul 2022 17:02:01 +0800 Subject: [PATCH 2/2] fix conflict Signed-off-by: gengliqi --- .../ParallelAggregatingBlockInputStream.cpp | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index ae8dd5d1329..758f11c1f0a 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -182,19 +182,9 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish() void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num) { parent.exceptions[thread_num] = exception; -<<<<<<< HEAD - /// can not cancel parent inputStream or the exception might be lost if (!parent.executed) /// kill the processor so ExchangeReceiver will be closed - parent.processor.cancel(true); -======= - Int32 old_value = -1; - parent.first_exception_index.compare_exchange_strong(old_value, static_cast(thread_num), std::memory_order_seq_cst, std::memory_order_relaxed); - - if (!parent.executed) - /// use cancel instead of kill to avoid too many useless error message - parent.cancel(false); ->>>>>>> 7b280755ba (fix a panic issue in parallel agg when exception is thrown (#5433)) + parent.cancel(true); }