From 1716c964bd43ff6084d301f0085418a10c338d10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C5=BEenan=20Zuki=C4=87?= Date: Wed, 7 Jul 2021 16:55:23 -0400 Subject: [PATCH] ENH: fix Python multi-processing hang on unix --- Modules/Core/Common/include/itkThreadPool.h | 14 ++++++++++++-- Modules/Core/Common/src/itkThreadPool.cxx | 21 ++++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/Modules/Core/Common/include/itkThreadPool.h b/Modules/Core/Common/include/itkThreadPool.h index 971f59ff63d..2d91c9a20b1 100644 --- a/Modules/Core/Common/include/itkThreadPool.h +++ b/Modules/Core/Common/include/itkThreadPool.h @@ -124,13 +124,23 @@ auto result = pool->AddWork([](int param) { return param; }, 7); SetDoNotWaitForThreads(bool doNotWaitForThreads); protected: - /* We need access to the mutex in AddWork, and the variable is only + /** We need access to the mutex in AddWork, and the variable is only * visible in .cxx file, so this method returns it. */ std::mutex & GetMutex(); ThreadPool(); - ~ThreadPool() override; + + /** Stop the pool and release threads. To be called by the destructor and atfork. */ + void + CleanUp(); + + ~ThreadPool() override { this->CleanUp(); } + + static void + PrepareForFork(); + static void + ResumeFromFork(); private: /** Only used to synchronize the global variable across static libraries.*/ diff --git a/Modules/Core/Common/src/itkThreadPool.cxx b/Modules/Core/Common/src/itkThreadPool.cxx index bc8f6084fa1..c663a910e02 100644 --- a/Modules/Core/Common/src/itkThreadPool.cxx +++ b/Modules/Core/Common/src/itkThreadPool.cxx @@ -77,6 +77,9 @@ ThreadPool::GetInstance() { new ThreadPool(); // constructor sets m_PimplGlobals->m_ThreadPoolInstance } +#if defined(ITK_USE_PTHREADS) + pthread_atfork(ThreadPool::PrepareForFork, ThreadPool::ResumeFromFork, ThreadPool::ResumeFromFork); +#endif } } return m_PimplGlobals->m_ThreadPoolInstance; @@ -132,7 +135,8 @@ ThreadPool::GetNumberOfCurrentlyIdleThreads() const return int(m_Threads.size()) - int(m_WorkQueue.size()); // lousy approximation } -ThreadPool::~ThreadPool() +void +ThreadPool::CleanUp() { { std::unique_lock mutexHolder(m_PimplGlobals->m_Mutex); @@ -154,6 +158,21 @@ ThreadPool::~ThreadPool() } } +void +ThreadPool::PrepareForFork() +{ + m_PimplGlobals->m_ThreadPoolInstance->CleanUp(); +} + +void +ThreadPool::ResumeFromFork() +{ + ThreadPool * instance = m_PimplGlobals->m_ThreadPoolInstance.GetPointer(); + ThreadIdType threadCount = instance->m_Threads.size(); + instance->m_Threads.clear(); + instance->m_Stopping = false; + instance->AddThreads(threadCount); +} void ThreadPool::ThreadExecute()