From 881f5689e44e1162d0a098e221c98d7e0ed80c9b Mon Sep 17 00:00:00 2001 From: Fu Zhe Date: Fri, 11 Mar 2022 18:59:50 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #4210 Signed-off-by: ti-chi-bot --- dbms/src/Common/DynamicThreadPool.cpp | 14 ++++++ dbms/src/Common/DynamicThreadPool.h | 2 + .../tests/gtest_dynamic_thread_pool.cpp | 43 +++++++++++++++++++ 3 files changed, 59 insertions(+) diff --git a/dbms/src/Common/DynamicThreadPool.cpp b/dbms/src/Common/DynamicThreadPool.cpp index 867616e168c..474071adf3a 100644 --- a/dbms/src/Common/DynamicThreadPool.cpp +++ b/dbms/src/Common/DynamicThreadPool.cpp @@ -77,6 +77,16 @@ void DynamicThreadPool::scheduledToNewDynamicThread(TaskPtr & task) t.detach(); } +<<<<<<< HEAD +======= +void DynamicThreadPool::executeTask(TaskPtr & task) +{ + UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_active_threads_of_thdpool, type_max_active_threads_of_thdpool); + task->execute(); + task.reset(); +} + +>>>>>>> 86e21ef59b (*: Fix bug that DynamicThreadPool may not destruct tasks in time. (#4210)) void DynamicThreadPool::fixedWork(size_t index) { Queue * queue = fixed_queues[index].get(); @@ -110,8 +120,12 @@ void DynamicThreadPool::dynamicWork(TaskPtr initial_task) if (!node.task) // may be timeout or cancelled break; +<<<<<<< HEAD node.task->execute(); node.task.reset(); +======= + executeTask(node.task); +>>>>>>> 86e21ef59b (*: Fix bug that DynamicThreadPool may not destruct tasks in time. (#4210)) } alive_dynamic_threads.fetch_sub(1); } diff --git a/dbms/src/Common/DynamicThreadPool.h b/dbms/src/Common/DynamicThreadPool.h index c5f39480f46..f816385d8d2 100644 --- a/dbms/src/Common/DynamicThreadPool.h +++ b/dbms/src/Common/DynamicThreadPool.h @@ -64,6 +64,8 @@ class DynamicThreadPool void fixedWork(size_t index); void dynamicWork(TaskPtr initial_task); + static void executeTask(TaskPtr & task); + const std::chrono::nanoseconds dynamic_auto_shrink_cooldown; std::vector fixed_threads; diff --git a/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp b/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp index 441405f68b4..ea2846370cf 100644 --- a/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp +++ b/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp @@ -3,6 +3,8 @@ namespace DB::tests { +namespace +{ class DynamicThreadPoolTest : public ::testing::Test { }; @@ -158,4 +160,45 @@ try } CATCH +struct X +{ + std::mutex * mu; + std::condition_variable * cv; + bool * destructed; + + X(std::mutex * mu_, std::condition_variable * cv_, bool * destructed_) + : mu(mu_) + , cv(cv_) + , destructed(destructed_) + {} + + ~X() + { + std::unique_lock lock(*mu); + *destructed = true; + cv->notify_all(); + } +}; + +TEST_F(DynamicThreadPoolTest, testTaskDestruct) +try +{ + std::mutex mu; + std::condition_variable cv; + bool destructed = false; + + DynamicThreadPool pool(0, std::chrono::minutes(1)); + auto tmp = std::make_shared(&mu, &cv, &destructed); + pool.schedule(true, [x = tmp] {}); + tmp.reset(); + + { + std::unique_lock lock(mu); + auto ret = cv.wait_for(lock, std::chrono::seconds(1), [&] { return destructed; }); + ASSERT_TRUE(ret); + } +} +CATCH + +} // namespace } // namespace DB::tests From 3cf4bec20b66ad669035ddf254dd326f949b415a Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 22 Apr 2022 20:39:31 +0800 Subject: [PATCH 2/3] resolve conflict Signed-off-by: xufei --- dbms/src/Common/DynamicThreadPool.cpp | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/dbms/src/Common/DynamicThreadPool.cpp b/dbms/src/Common/DynamicThreadPool.cpp index 474071adf3a..a1dc0390b84 100644 --- a/dbms/src/Common/DynamicThreadPool.cpp +++ b/dbms/src/Common/DynamicThreadPool.cpp @@ -77,16 +77,12 @@ void DynamicThreadPool::scheduledToNewDynamicThread(TaskPtr & task) t.detach(); } -<<<<<<< HEAD -======= void DynamicThreadPool::executeTask(TaskPtr & task) { - UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_active_threads_of_thdpool, type_max_active_threads_of_thdpool); task->execute(); task.reset(); } ->>>>>>> 86e21ef59b (*: Fix bug that DynamicThreadPool may not destruct tasks in time. (#4210)) void DynamicThreadPool::fixedWork(size_t index) { Queue * queue = fixed_queues[index].get(); @@ -97,13 +93,13 @@ void DynamicThreadPool::fixedWork(size_t index) queue->pop(task); if (!task) break; - task->execute(); + executeTask(task); } } void DynamicThreadPool::dynamicWork(TaskPtr initial_task) { - initial_task->execute(); + executeTask(initial_task); DynamicNode node; while (true) @@ -120,12 +116,7 @@ void DynamicThreadPool::dynamicWork(TaskPtr initial_task) if (!node.task) // may be timeout or cancelled break; -<<<<<<< HEAD - node.task->execute(); - node.task.reset(); -======= executeTask(node.task); ->>>>>>> 86e21ef59b (*: Fix bug that DynamicThreadPool may not destruct tasks in time. (#4210)) } alive_dynamic_threads.fetch_sub(1); } From 631181a0462e2f0c4e8214cdcfce9b077af51ee9 Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 22 Apr 2022 20:40:33 +0800 Subject: [PATCH 3/3] format code Signed-off-by: xufei --- dbms/src/Common/DynamicThreadPool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Common/DynamicThreadPool.cpp b/dbms/src/Common/DynamicThreadPool.cpp index a1dc0390b84..f27213a9e1a 100644 --- a/dbms/src/Common/DynamicThreadPool.cpp +++ b/dbms/src/Common/DynamicThreadPool.cpp @@ -93,7 +93,7 @@ void DynamicThreadPool::fixedWork(size_t index) queue->pop(task); if (!task) break; - executeTask(task); + executeTask(task); } }