Skip to content

Commit

Permalink
[fix] Suppress "Python memory allocator called without holding the GI…
Browse files Browse the repository at this point in the history
…L" errors with PYTHONDEVMODE=1
  • Loading branch information
mxmlnkn committed Nov 25, 2023
1 parent 43bae71 commit 5dafe0c
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 5 deletions.
12 changes: 12 additions & 0 deletions src/core/BlockFetcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <BlockFinderInterface.hpp>
#include <Cache.hpp>
#include <common.hpp>
#ifdef WITH_PYTHON_SUPPORT
#include <filereader/Python.hpp> // For unlocking the GIL in case the block fetch code uses PythonFileReader
#endif
#include <Prefetcher.hpp>
#include <ThreadPool.hpp>

Expand Down Expand Up @@ -237,6 +240,15 @@ class BlockFetcher
{
[[maybe_unused]] const auto tGetStart = now();

#ifdef WITH_PYTHON_SUPPORT
/* The GIL needs to be unlocked for the worker threads to not wait infinitely when calling methods
* on a given Python file object. In theory, it suffices to call this unlock here to avoid deadlocks
* because it is the only method that waits for results from the worker threads. But, it might be
* more efficient to unlock the GIL outside to avoid many unlock/lock cycles and to leave it unlocked
* for longer so as to not hinder the worker threads. */
const ScopedGILUnlock unlockedGIL;
#endif

/* Not using capture bindings here because C++ is too dumb to capture those, yet.
* @see https://stackoverflow.com/a/46115028/2191065 */
auto resultFromCaches = getFromCaches( blockOffset );
Expand Down
5 changes: 5 additions & 0 deletions src/core/BlockFinder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <BlockFinderInterface.hpp>
#include <filereader/FileReader.hpp>
#include <filereader/Python.hpp>
#include <JoiningThread.hpp>
#include <StreamedResults.hpp>

Expand Down Expand Up @@ -108,6 +109,10 @@ class BlockFinder final :
get( size_t blockNumber,
double timeoutInSeconds ) override
{
#ifdef WITH_PYTHON_SUPPORT
const ScopedGILUnlock unlockedGIL;
#endif

if ( !m_blockOffsets.finalized() ) {
startThreads();
}
Expand Down
4 changes: 4 additions & 0 deletions src/core/ParallelBitStringFinder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ template<uint8_t bitStringSize>
size_t
ParallelBitStringFinder<bitStringSize>::find()
{
#ifdef WITH_PYTHON_SUPPORT
const ScopedGILUnlock unlockedGIL;
#endif

while ( !BaseType::eof() || !m_threadResults.empty() )
{
/* Check whether there are results available and return those. Take care to return results in order! */
Expand Down
11 changes: 11 additions & 0 deletions src/core/ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include "AffinityHelpers.hpp"
#include "JoiningThread.hpp"
#ifdef WITH_PYTHON_SUPPORT
#include "filereader/Python.hpp"
#endif


/**
Expand Down Expand Up @@ -113,6 +116,14 @@ class ThreadPool
m_threadPoolRunning = false;
m_pingWorkers.notify_all();
}

#ifdef WITH_PYTHON_SUPPORT
/* The GIL needs to be unlocked for the worker threads to not wait infinitely when calling methods
* on a given Python file object. In this waiting time, they also wouldn't check m_threadPoolRunning,
* nor PyErr_CheckSignals and therefore would deadlock. */
const ScopedGILUnlock unlockedGIL;
#endif

m_threads.clear();
}

Expand Down
135 changes: 134 additions & 1 deletion src/core/filereader/Python.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,136 @@ class PythonExceptionThrownBySignal :
};


class ScopedGIL
{
public:
struct ReferenceCounter
{
bool isLocked;
size_t counter{ 0 };
};

private:
class GILMutex
{
public:
GILMutex() :
m_isLocked( PyGILState_Check() == 1 ),
m_isPythonThread( m_isLocked )
{}

void
lock( bool doLock = true )
{
if ( m_isLocked == doLock ) {
return;
}

if ( doLock ) {
if ( m_isPythonThread ) {
PyEval_RestoreThread( m_unlockState );
m_unlockState = nullptr;
} else {
m_lockState = PyGILState_Ensure();
}
} else {
if ( m_isPythonThread ) {
m_unlockState = PyEval_SaveThread();
} else {
PyGILState_Release( m_lockState );
m_lockState = {};
}
}

m_isLocked = doLock;
}

[[nodiscard]] constexpr bool
isLocked() const
{
return m_isLocked;
}

private:
bool m_isLocked;
const bool m_isPythonThread;

/** Used for locking non-Python threads. */
PyGILState_STATE m_lockState{};
/** Used for unlocking and relocking the Python main thread. */
PyThreadState* m_unlockState{ nullptr };
};

public:
ScopedGIL( bool lock )
{
if ( m_referenceCounters.empty() && ( m_mutex.isLocked() == lock ) ) {
return;
}

if ( !m_referenceCounters.empty() && ( m_referenceCounters.back().isLocked == lock ) ) {
++m_referenceCounters.back().counter;
} else {
m_referenceCounters.emplace_back( ReferenceCounter{ lock, 1 } );
m_mutex.lock( lock );
}
}

~ScopedGIL()
{
if ( m_referenceCounters.empty() ) {
/* This happens when, e.g., trying to look the Python main thread when it already held the GIL. */
return;
}

if ( m_referenceCounters.back().counter == 0 ) {
std::cerr << "Something went wrong. The counter shouldn't be zero at this point!\n";
return;
}

--m_referenceCounters.back().counter;
if ( m_referenceCounters.back().counter == 0 ) {
m_mutex.lock( !m_referenceCounters.back().isLocked );
m_referenceCounters.pop_back();
}
}

ScopedGIL( const ScopedGIL& ) = delete;
ScopedGIL( ScopedGIL&& ) = delete;
ScopedGIL& operator=( const ScopedGIL& ) = delete;
ScopedGIL& operator=( ScopedGIL&& ) = delete;

private:
inline static thread_local GILMutex m_mutex;
inline static thread_local std::vector<ReferenceCounter> m_referenceCounters;
};


class ScopedGILLock :
public ScopedGIL
{
public:
ScopedGILLock() :
ScopedGIL( true )
{}
};


class ScopedGILUnlock :
public ScopedGIL
{
public:
ScopedGILUnlock() :
ScopedGIL( false )
{}
};


void
checkPythonSignalHandlers()
{
const ScopedGILLock gilLock;

/**
* @see https://docs.python.org/3/c-api/exceptions.html#signal-handling
* > The function attempts to handle all pending signals, and then returns 0.
Expand Down Expand Up @@ -116,6 +243,8 @@ callPyObject( PyObject* pythonObject,
{
constexpr auto nArgs = sizeof...( Args );

const ScopedGILLock gilLock;

if constexpr ( std::is_same_v<Result, void> ) {
PyObject_Call( pythonObject, PyTuple_Pack( nArgs, toPyObject( args )... ), nullptr );
} else {
Expand Down Expand Up @@ -188,6 +317,7 @@ class PythonFileReader :
seek( m_initialPosition );
}

const ScopedGILLock gilLock;
if ( Py_REFCNT( m_pythonObject ) == 1 ) {
callPyObject<void>( mpo_close );
}
Expand Down Expand Up @@ -237,6 +367,8 @@ class PythonFileReader :
return 0;
}

const ScopedGILLock gilLock;

/** @todo better to use readinto because read might return less than requested even before the EOF! */
auto* const bytes = callPyObject<PyObject*>( mpo_read, nMaxBytesToRead );
if ( !PyBytes_Check( bytes ) ) {
Expand Down Expand Up @@ -286,6 +418,8 @@ class PythonFileReader :
return 0;
}

const ScopedGILLock gilLock;

auto* const bytes = PyBytes_FromStringAndSize( buffer, nBytesToWrite );
const auto nBytesWritten = callPyObject<long long int>( mpo_write, bytes );

Expand Down Expand Up @@ -327,7 +461,6 @@ class PythonFileReader :
}

m_currentPosition = callPyObject<size_t>( mpo_seek, offset, pythonWhence );
//m_currentPosition = fromPyObject<size_t>( PyObject_Call( mpo_seek, PyTuple_Pack( 2, PyLong_FromLongLong( offset ), PyLong_FromLongLong( (long long)pythonWhence ) ), nullptr ) );

return m_currentPosition;
}
Expand Down
1 change: 1 addition & 0 deletions src/indexed_bzip2/ParallelBZ2Reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class ParallelBZ2Reader final :

#ifdef WITH_PYTHON_SUPPORT
checkPythonSignalHandlers();
const ScopedGILUnlock unlockedGIL;
#endif

auto blockInfo = m_blockMap->findDataOffset( m_currentPosition );
Expand Down
9 changes: 5 additions & 4 deletions src/rapidgzip/ParallelGzipReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ class ParallelGzipReader final :

size_t nBytesDecoded = 0;
while ( ( nBytesDecoded < nBytesToRead ) && !eof() ) {
#ifdef WITH_PYTHON_SUPPORT
checkPythonSignalHandlers();
const ScopedGILUnlock unlockedGIL;
#endif

const auto blockResult = chunkFetcher().get( m_currentPosition );
if ( !blockResult ) {
m_atEndOfFile = true;
Expand Down Expand Up @@ -461,10 +466,6 @@ class ParallelGzipReader final :
throw std::logic_error( std::move( message ).str() );
}

#ifdef WITH_PYTHON_SUPPORT
checkPythonSignalHandlers();
#endif

const auto nBytesToDecode = std::min( blockSize - offsetInBlock, nBytesToRead - nBytesDecoded );

[[maybe_unused]] const auto tCRC32Start = now();
Expand Down

0 comments on commit 5dafe0c

Please sign in to comment.