diff --git a/dbms/src/Common/DynamicThreadPool.cpp b/dbms/src/Common/DynamicThreadPool.cpp index 87f7e8f3e26..ff13103019d 100644 --- a/dbms/src/Common/DynamicThreadPool.cpp +++ b/dbms/src/Common/DynamicThreadPool.cpp @@ -81,10 +81,11 @@ void DynamicThreadPool::scheduledToNewDynamicThread(TaskPtr & task) t.detach(); } -void executeTask(const std::unique_ptr & task) +void DynamicThreadPool::executeTask(TaskPtr & task) { UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_active_threads_of_thdpool, type_max_active_threads_of_thdpool); task->execute(); + task.reset(); } void DynamicThreadPool::fixedWork(size_t index) @@ -124,7 +125,6 @@ void DynamicThreadPool::dynamicWork(TaskPtr initial_task) if (!node.task) // may be timeout or cancelled break; executeTask(node.task); - node.task.reset(); } alive_dynamic_threads.fetch_sub(1); } diff --git a/dbms/src/Common/DynamicThreadPool.h b/dbms/src/Common/DynamicThreadPool.h index c5f39480f46..f816385d8d2 100644 --- a/dbms/src/Common/DynamicThreadPool.h +++ b/dbms/src/Common/DynamicThreadPool.h @@ -64,6 +64,8 @@ class DynamicThreadPool void fixedWork(size_t index); void dynamicWork(TaskPtr initial_task); + static void executeTask(TaskPtr & task); + const std::chrono::nanoseconds dynamic_auto_shrink_cooldown; std::vector fixed_threads; diff --git a/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp b/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp index f097cabfdb3..a850f88bcbb 100644 --- a/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp +++ b/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp @@ -3,6 +3,8 @@ namespace DB::tests { +namespace +{ class DynamicThreadPoolTest : public ::testing::Test { }; @@ -148,4 +150,45 @@ try } CATCH +struct X +{ + std::mutex * mu; + std::condition_variable * cv; + bool * destructed; + + X(std::mutex * mu_, std::condition_variable * cv_, bool * destructed_) + : mu(mu_) + , cv(cv_) + , destructed(destructed_) + {} + + ~X() + { + std::unique_lock lock(*mu); + *destructed = true; + cv->notify_all(); + } +}; + +TEST_F(DynamicThreadPoolTest, testTaskDestruct) +try +{ + std::mutex mu; + std::condition_variable cv; + bool destructed = false; + + DynamicThreadPool pool(0, std::chrono::minutes(1)); + auto tmp = std::make_shared(&mu, &cv, &destructed); + pool.schedule(true, [x = tmp] {}); + tmp.reset(); + + { + std::unique_lock lock(mu); + auto ret = cv.wait_for(lock, std::chrono::seconds(1), [&] { return destructed; }); + ASSERT_TRUE(ret); + } +} +CATCH + +} // namespace } // namespace DB::tests