From 5dafe0c1997f048ad609e107916b75a801bf3abd Mon Sep 17 00:00:00 2001 From: mxmlnkn Date: Fri, 24 Nov 2023 12:48:26 +0100 Subject: [PATCH] [fix] Suppress "Python memory allocator called without holding the GIL" errors with PYTHONDEVMODE=1 --- src/core/BlockFetcher.hpp | 12 +++ src/core/BlockFinder.hpp | 5 + src/core/ParallelBitStringFinder.hpp | 4 + src/core/ThreadPool.hpp | 11 ++ src/core/filereader/Python.hpp | 135 +++++++++++++++++++++++- src/indexed_bzip2/ParallelBZ2Reader.hpp | 1 + src/rapidgzip/ParallelGzipReader.hpp | 9 +- 7 files changed, 172 insertions(+), 5 deletions(-) diff --git a/src/core/BlockFetcher.hpp b/src/core/BlockFetcher.hpp index c77181ca..127addcf 100644 --- a/src/core/BlockFetcher.hpp +++ b/src/core/BlockFetcher.hpp @@ -20,6 +20,9 @@ #include #include #include +#ifdef WITH_PYTHON_SUPPORT + #include // For unlocking the GIL in case the block fetch code uses PythonFileReader +#endif #include #include @@ -237,6 +240,15 @@ class BlockFetcher { [[maybe_unused]] const auto tGetStart = now(); + #ifdef WITH_PYTHON_SUPPORT + /* The GIL needs to be unlocked for the worker threads to not wait infinitely when calling methods + * on a given Python file object. In theory, it suffices to call this unlock here to avoid deadlocks + * because it is the only method that waits for results from the worker threads. But, it might be + * more efficient to unlock the GIL outside to avoid many unlock/lock cycles and to leave it unlocked + * for longer so as to not hinder the worker threads. */ + const ScopedGILUnlock unlockedGIL; + #endif + /* Not using capture bindings here because C++ is too dumb to capture those, yet. * @see https://stackoverflow.com/a/46115028/2191065 */ auto resultFromCaches = getFromCaches( blockOffset ); diff --git a/src/core/BlockFinder.hpp b/src/core/BlockFinder.hpp index ce00f256..f48c6118 100644 --- a/src/core/BlockFinder.hpp +++ b/src/core/BlockFinder.hpp @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -108,6 +109,10 @@ class BlockFinder final : get( size_t blockNumber, double timeoutInSeconds ) override { +#ifdef WITH_PYTHON_SUPPORT + const ScopedGILUnlock unlockedGIL; +#endif + if ( !m_blockOffsets.finalized() ) { startThreads(); } diff --git a/src/core/ParallelBitStringFinder.hpp b/src/core/ParallelBitStringFinder.hpp index 504b1ef9..6b5a3168 100644 --- a/src/core/ParallelBitStringFinder.hpp +++ b/src/core/ParallelBitStringFinder.hpp @@ -160,6 +160,10 @@ template size_t ParallelBitStringFinder::find() { +#ifdef WITH_PYTHON_SUPPORT + const ScopedGILUnlock unlockedGIL; +#endif + while ( !BaseType::eof() || !m_threadResults.empty() ) { /* Check whether there are results available and return those. Take care to return results in order! */ diff --git a/src/core/ThreadPool.hpp b/src/core/ThreadPool.hpp index 2bcc37b1..fed5d4da 100644 --- a/src/core/ThreadPool.hpp +++ b/src/core/ThreadPool.hpp @@ -17,6 +17,9 @@ #include "AffinityHelpers.hpp" #include "JoiningThread.hpp" +#ifdef WITH_PYTHON_SUPPORT + #include "filereader/Python.hpp" +#endif /** @@ -113,6 +116,14 @@ class ThreadPool m_threadPoolRunning = false; m_pingWorkers.notify_all(); } + + #ifdef WITH_PYTHON_SUPPORT + /* The GIL needs to be unlocked for the worker threads to not wait infinitely when calling methods + * on a given Python file object. In this waiting time, they also wouldn't check m_threadPoolRunning, + * nor PyErr_CheckSignals and therefore would deadlock. */ + const ScopedGILUnlock unlockedGIL; + #endif + m_threads.clear(); } diff --git a/src/core/filereader/Python.hpp b/src/core/filereader/Python.hpp index 11ebfb3f..7095a830 100644 --- a/src/core/filereader/Python.hpp +++ b/src/core/filereader/Python.hpp @@ -25,9 +25,136 @@ class PythonExceptionThrownBySignal : }; +class ScopedGIL +{ +public: + struct ReferenceCounter + { + bool isLocked; + size_t counter{ 0 }; + }; + +private: + class GILMutex + { + public: + GILMutex() : + m_isLocked( PyGILState_Check() == 1 ), + m_isPythonThread( m_isLocked ) + {} + + void + lock( bool doLock = true ) + { + if ( m_isLocked == doLock ) { + return; + } + + if ( doLock ) { + if ( m_isPythonThread ) { + PyEval_RestoreThread( m_unlockState ); + m_unlockState = nullptr; + } else { + m_lockState = PyGILState_Ensure(); + } + } else { + if ( m_isPythonThread ) { + m_unlockState = PyEval_SaveThread(); + } else { + PyGILState_Release( m_lockState ); + m_lockState = {}; + } + } + + m_isLocked = doLock; + } + + [[nodiscard]] constexpr bool + isLocked() const + { + return m_isLocked; + } + + private: + bool m_isLocked; + const bool m_isPythonThread; + + /** Used for locking non-Python threads. */ + PyGILState_STATE m_lockState{}; + /** Used for unlocking and relocking the Python main thread. */ + PyThreadState* m_unlockState{ nullptr }; + }; + +public: + ScopedGIL( bool lock ) + { + if ( m_referenceCounters.empty() && ( m_mutex.isLocked() == lock ) ) { + return; + } + + if ( !m_referenceCounters.empty() && ( m_referenceCounters.back().isLocked == lock ) ) { + ++m_referenceCounters.back().counter; + } else { + m_referenceCounters.emplace_back( ReferenceCounter{ lock, 1 } ); + m_mutex.lock( lock ); + } + } + + ~ScopedGIL() + { + if ( m_referenceCounters.empty() ) { + /* This happens when, e.g., trying to look the Python main thread when it already held the GIL. */ + return; + } + + if ( m_referenceCounters.back().counter == 0 ) { + std::cerr << "Something went wrong. The counter shouldn't be zero at this point!\n"; + return; + } + + --m_referenceCounters.back().counter; + if ( m_referenceCounters.back().counter == 0 ) { + m_mutex.lock( !m_referenceCounters.back().isLocked ); + m_referenceCounters.pop_back(); + } + } + + ScopedGIL( const ScopedGIL& ) = delete; + ScopedGIL( ScopedGIL&& ) = delete; + ScopedGIL& operator=( const ScopedGIL& ) = delete; + ScopedGIL& operator=( ScopedGIL&& ) = delete; + +private: + inline static thread_local GILMutex m_mutex; + inline static thread_local std::vector m_referenceCounters; +}; + + +class ScopedGILLock : + public ScopedGIL +{ +public: + ScopedGILLock() : + ScopedGIL( true ) + {} +}; + + +class ScopedGILUnlock : + public ScopedGIL +{ +public: + ScopedGILUnlock() : + ScopedGIL( false ) + {} +}; + + void checkPythonSignalHandlers() { + const ScopedGILLock gilLock; + /** * @see https://docs.python.org/3/c-api/exceptions.html#signal-handling * > The function attempts to handle all pending signals, and then returns 0. @@ -116,6 +243,8 @@ callPyObject( PyObject* pythonObject, { constexpr auto nArgs = sizeof...( Args ); + const ScopedGILLock gilLock; + if constexpr ( std::is_same_v ) { PyObject_Call( pythonObject, PyTuple_Pack( nArgs, toPyObject( args )... ), nullptr ); } else { @@ -188,6 +317,7 @@ class PythonFileReader : seek( m_initialPosition ); } + const ScopedGILLock gilLock; if ( Py_REFCNT( m_pythonObject ) == 1 ) { callPyObject( mpo_close ); } @@ -237,6 +367,8 @@ class PythonFileReader : return 0; } + const ScopedGILLock gilLock; + /** @todo better to use readinto because read might return less than requested even before the EOF! */ auto* const bytes = callPyObject( mpo_read, nMaxBytesToRead ); if ( !PyBytes_Check( bytes ) ) { @@ -286,6 +418,8 @@ class PythonFileReader : return 0; } + const ScopedGILLock gilLock; + auto* const bytes = PyBytes_FromStringAndSize( buffer, nBytesToWrite ); const auto nBytesWritten = callPyObject( mpo_write, bytes ); @@ -327,7 +461,6 @@ class PythonFileReader : } m_currentPosition = callPyObject( mpo_seek, offset, pythonWhence ); - //m_currentPosition = fromPyObject( PyObject_Call( mpo_seek, PyTuple_Pack( 2, PyLong_FromLongLong( offset ), PyLong_FromLongLong( (long long)pythonWhence ) ), nullptr ) ); return m_currentPosition; } diff --git a/src/indexed_bzip2/ParallelBZ2Reader.hpp b/src/indexed_bzip2/ParallelBZ2Reader.hpp index a08c6772..f97f53bc 100644 --- a/src/indexed_bzip2/ParallelBZ2Reader.hpp +++ b/src/indexed_bzip2/ParallelBZ2Reader.hpp @@ -187,6 +187,7 @@ class ParallelBZ2Reader final : #ifdef WITH_PYTHON_SUPPORT checkPythonSignalHandlers(); + const ScopedGILUnlock unlockedGIL; #endif auto blockInfo = m_blockMap->findDataOffset( m_currentPosition ); diff --git a/src/rapidgzip/ParallelGzipReader.hpp b/src/rapidgzip/ParallelGzipReader.hpp index f4a9f683..588acedb 100644 --- a/src/rapidgzip/ParallelGzipReader.hpp +++ b/src/rapidgzip/ParallelGzipReader.hpp @@ -431,6 +431,11 @@ class ParallelGzipReader final : size_t nBytesDecoded = 0; while ( ( nBytesDecoded < nBytesToRead ) && !eof() ) { + #ifdef WITH_PYTHON_SUPPORT + checkPythonSignalHandlers(); + const ScopedGILUnlock unlockedGIL; + #endif + const auto blockResult = chunkFetcher().get( m_currentPosition ); if ( !blockResult ) { m_atEndOfFile = true; @@ -461,10 +466,6 @@ class ParallelGzipReader final : throw std::logic_error( std::move( message ).str() ); } - #ifdef WITH_PYTHON_SUPPORT - checkPythonSignalHandlers(); - #endif - const auto nBytesToDecode = std::min( blockSize - offsetInBlock, nBytesToRead - nBytesDecoded ); [[maybe_unused]] const auto tCRC32Start = now();