Skip to content

Commit

Permalink
ARROW-18383: [C++] Avoid global variables for thread pools and at-for…
Browse files Browse the repository at this point in the history
…k handlers

Initialization order of module globals is undefined.

In a particular case, the IO thread pool would first be instantiated at library load, registering an at-fork handler.
Then, only after, the at-fork handlers would be initialized, losing the handler registered just before.
  • Loading branch information
pitrou committed Nov 22, 2022
1 parent 21309ea commit be52dbc
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 78 deletions.
10 changes: 6 additions & 4 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ using internal::ThreadPool;

namespace io {

static IOContext g_default_io_context{};

IOContext::IOContext(MemoryPool* pool, StopToken stop_token)
: IOContext(pool, internal::GetIOThreadPool(), std::move(stop_token)) {}

const IOContext& default_io_context() { return g_default_io_context; }
const IOContext& default_io_context() {
// Avoid using a global variable because of initialization order issues (ARROW-18383)
static IOContext g_default_io_context{};
return g_default_io_context;
}

int GetIOThreadPoolCapacity() { return internal::GetIOThreadPool()->GetCapacity(); }

Expand Down Expand Up @@ -103,7 +105,7 @@ class InputStreamBlockIterator {

} // namespace

const IOContext& Readable::io_context() const { return g_default_io_context; }
const IOContext& Readable::io_context() const { return default_io_context(); }

Status InputStream::Advance(int64_t nbytes) { return Read(nbytes).status(); }

Expand Down
164 changes: 90 additions & 74 deletions cpp/src/arrow/util/atfork_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,104 +34,120 @@ namespace internal {

namespace {

struct RunningHandler {
// A temporary owning copy of a handler, to make sure that a handler
// that runs before fork can still run after fork.
std::shared_ptr<AtForkHandler> handler;
// The token returned by the before-fork handler, to pass to after-fork handlers.
std::any token;

explicit RunningHandler(std::shared_ptr<AtForkHandler> handler)
: handler(std::move(handler)) {}
};
// Singleton state for at-fork management.
// We do not use global variables because of initialization order issues (ARROW-18383).
// Instead, a function-local static ensures the state is initialized
// opportunistically (see GetAtForkState()).
struct AtForkState {
struct RunningHandler {
// A temporary owning copy of a handler, to make sure that a handler
// that runs before fork can still run after fork.
std::shared_ptr<AtForkHandler> handler;
// The token returned by the before-fork handler, to pass to after-fork handlers.
std::any token;

explicit RunningHandler(std::shared_ptr<AtForkHandler> handler)
: handler(std::move(handler)) {}
};

void MaintainHandlersUnlocked() {
auto it = std::remove_if(
handlers_.begin(), handlers_.end(),
[](const std::weak_ptr<AtForkHandler>& ptr) { return ptr.expired(); });
handlers_.erase(it, handlers_.end());
}

std::mutex g_mutex;
std::vector<std::weak_ptr<AtForkHandler>> g_handlers;
std::vector<RunningHandler> g_handlers_while_forking;
void BeforeFork() {
// Lock the mutex and keep it locked until the end of AfterForkParent(),
// to avoid multiple concurrent forks and atforks.
mutex_.lock();

void MaintainHandlersUnlocked() {
auto it = std::remove_if(
g_handlers.begin(), g_handlers.end(),
[](const std::weak_ptr<AtForkHandler>& ptr) { return ptr.expired(); });
g_handlers.erase(it, g_handlers.end());
}
DCHECK(handlers_while_forking_.empty()); // AfterForkParent clears it

void BeforeFork() {
// Lock the mutex and keep it locked until the end of AfterForkParent(),
// to avoid multiple concurrent forks and atforks.
g_mutex.lock();

DCHECK(g_handlers_while_forking.empty()); // AfterForkParent clears it
for (const auto& weak_handler : handlers_) {
if (auto handler = weak_handler.lock()) {
handlers_while_forking_.emplace_back(std::move(handler));
}
}

for (const auto& weak_handler : g_handlers) {
if (auto handler = weak_handler.lock()) {
g_handlers_while_forking.emplace_back(std::move(handler));
// XXX can the handler call RegisterAtFork()?
for (auto&& handler : handlers_while_forking_) {
if (handler.handler->before) {
handler.token = handler.handler->before();
}
}
}

// XXX can the handler call RegisterAtFork()?
for (auto&& handler : g_handlers_while_forking) {
if (handler.handler->before) {
handler.token = handler.handler->before();
void AfterForkParent() {
// The mutex was locked by BeforeFork()
auto handlers = std::move(handlers_while_forking_);
handlers_while_forking_.clear();

// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->parent_after) {
handler.handler->parent_after(std::move(handler.token));
}
}
}
}

void AfterForkParent() {
// The mutex was locked by BeforeFork()
mutex_.unlock();
// handlers will be destroyed here without the mutex locked, so that
// any action taken by destructors might call RegisterAtFork
}

auto handlers = std::move(g_handlers_while_forking);
g_handlers_while_forking.clear();
// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->parent_after) {
handler.handler->parent_after(std::move(handler.token));
void AfterForkChild() {
// Need to reinitialize the mutex as it is probably invalid. Also, the
// old mutex destructor may fail.
// Fortunately, we are a single thread in the child process by now, so no
// additional synchronization is needed.
new (&mutex_) std::mutex;

auto handlers = std::move(handlers_while_forking_);
handlers_while_forking_.clear();

// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->child_after) {
handler.handler->child_after(std::move(handler.token));
}
}
}

g_mutex.unlock();
// handlers will be destroyed here without the mutex locked, so that
// any action taken by destructors might call RegisterAtFork
}

void AfterForkChild() {
// Need to reinitialize the mutex as it is probably invalid. Also, the
// old mutex destructor may fail.
// Fortunately, we are a single thread in the child process by now, so no
// additional synchronization is needed.
new (&g_mutex) std::mutex;

auto handlers = std::move(g_handlers_while_forking);
g_handlers_while_forking.clear();
// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->child_after) {
handler.handler->child_after(std::move(handler.token));
}
void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
std::lock_guard<std::mutex> lock(mutex_);
// This is O(n) for each at-fork registration. We assume that n remains
// typically low and calls to this function are not performance-critical.
MaintainHandlersUnlocked();
handlers_.push_back(std::move(weak_handler));
}
}

struct AtForkInitializer {
AtForkInitializer() {
std::mutex mutex_;
std::vector<std::weak_ptr<AtForkHandler>> handlers_;
std::vector<RunningHandler> handlers_while_forking_;
};

AtForkState* GetAtForkState() {
static std::unique_ptr<AtForkState> state = []() {
auto state = std::make_unique<AtForkState>();
#ifndef _WIN32
int r = pthread_atfork(&BeforeFork, &AfterForkParent, &AfterForkChild);
int r = pthread_atfork(/*prepare=*/[] { GetAtForkState()->BeforeFork(); },
/*parent=*/[] { GetAtForkState()->AfterForkParent(); },
/*child=*/[] { GetAtForkState()->AfterForkChild(); });
if (r != 0) {
IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort();
}
#endif
}
};
return state;
}();
return state.get();
}

}; // namespace

void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
static AtForkInitializer initializer;

std::lock_guard<std::mutex> lock(g_mutex);
MaintainHandlersUnlocked();
g_handlers.push_back(std::move(weak_handler));
GetAtForkState()->RegisterAtFork(std::move(weak_handler));
}

} // namespace internal
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ std::shared_ptr<ThreadPool> ThreadPool::MakeCpuThreadPool() {
}

ThreadPool* GetCpuThreadPool() {
// Avoid using a global variable because of initialization order issues (ARROW-18383)
static std::shared_ptr<ThreadPool> singleton = ThreadPool::MakeCpuThreadPool();
return singleton.get();
}
Expand Down

0 comments on commit be52dbc

Please sign in to comment.