Skip to content

Commit

Permalink
ARROW-17975: [C++] Create at-fork facility (apache#14594)
Browse files Browse the repository at this point in the history
Also migrate the `ThreadPool` class to use the new facility.

The `util::GlobalForkSafeMutex` facility is now unused, we may want to remove it in a later PR.

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pitrou authored and vibhatha committed Nov 8, 2022
1 parent 039f1df commit 5ef742a
Show file tree
Hide file tree
Showing 10 changed files with 586 additions and 78 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ set(ARROW_SRCS
io/stdio.cc
io/transform.cc
util/async_util.cc
util/atfork_internal.cc
util/basic_decimal.cc
util/bit_block_counter.cc
util/bit_run_reader.cc
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,22 @@ bool FileIsClosed(int fd) {
#endif
}

#if !defined(_WIN32)
void AssertChildExit(int child_pid, int expected_exit_status) {
ASSERT_GT(child_pid, 0);
int child_status;
int got_pid = waitpid(child_pid, &child_status, 0);
ASSERT_EQ(got_pid, child_pid);
if (WIFSIGNALED(child_status)) {
FAIL() << "Child terminated by signal " << WTERMSIG(child_status);
}
if (!WIFEXITED(child_status)) {
FAIL() << "Child didn't terminate normally?? Child status = " << child_status;
}
ASSERT_EQ(WEXITSTATUS(child_status), expected_exit_status);
}
#endif

bool LocaleExists(const char* locale) {
try {
std::locale loc(locale);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ std::vector<T> IteratorToVector(Iterator<T> iterator) {
ARROW_TESTING_EXPORT
bool LocaleExists(const char* locale);

#ifndef _WIN32
ARROW_TESTING_EXPORT
void AssertChildExit(int child_pid, int expected_exit_status = 0);
#endif

// A RAII-style object that switches to a new locale, and switches back
// to the old locale when going out of scope. Doesn't do anything if the
// new locale doesn't exist on the local machine.
Expand Down
18 changes: 13 additions & 5 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ endif()
add_arrow_test(utility-test
SOURCES
align_util_test.cc
async_generator_test.cc
async_util_test.cc
bit_block_counter_test.cc
bit_util_test.cc
atfork_test.cc
byte_size_test.cc
cache_test.cc
checked_cast_test.cc
Expand All @@ -60,7 +57,6 @@ add_arrow_test(utility-test
queue_test.cc
range_test.cc
reflection_test.cc
rle_encoding_test.cc
small_vector_test.cc
stl_util_test.cc
string_test.cc
Expand All @@ -73,6 +69,18 @@ add_arrow_test(utility-test
utf8_util_test.cc
value_parsing_test.cc)

add_arrow_test(async-utility-test
SOURCES
async_generator_test.cc
async_util_test.cc
test_common.cc)

add_arrow_test(bit-utility-test
SOURCES
bit_block_counter_test.cc
bit_util_test.cc
rle_encoding_test.cc)

add_arrow_test(threading-utility-test
SOURCES
cancel_test.cc
Expand Down
138 changes: 138 additions & 0 deletions cpp/src/arrow/util/atfork_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/util/atfork_internal.h"

#include <algorithm>
#include <atomic>
#include <mutex>
#include <vector>

#ifndef _WIN32
#include <pthread.h>
#endif

#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"

namespace arrow {
namespace internal {

namespace {

struct RunningHandler {
// A temporary owning copy of a handler, to make sure that a handler
// that runs before fork can still run after fork.
std::shared_ptr<AtForkHandler> handler;
// The token returned by the before-fork handler, to pass to after-fork handlers.
std::any token;

explicit RunningHandler(std::shared_ptr<AtForkHandler> handler)
: handler(std::move(handler)) {}
};

std::mutex g_mutex;
std::vector<std::weak_ptr<AtForkHandler>> g_handlers;
std::vector<RunningHandler> g_handlers_while_forking;

void MaintainHandlersUnlocked() {
auto it = std::remove_if(
g_handlers.begin(), g_handlers.end(),
[](const std::weak_ptr<AtForkHandler>& ptr) { return ptr.expired(); });
g_handlers.erase(it, g_handlers.end());
}

void BeforeFork() {
// Lock the mutex and keep it locked until the end of AfterForkParent(),
// to avoid multiple concurrent forks and atforks.
g_mutex.lock();

DCHECK(g_handlers_while_forking.empty()); // AfterForkParent clears it

for (const auto& weak_handler : g_handlers) {
if (auto handler = weak_handler.lock()) {
g_handlers_while_forking.emplace_back(std::move(handler));
}
}

// XXX can the handler call RegisterAtFork()?
for (auto&& handler : g_handlers_while_forking) {
if (handler.handler->before) {
handler.token = handler.handler->before();
}
}
}

void AfterForkParent() {
// The mutex was locked by BeforeFork()

auto handlers = std::move(g_handlers_while_forking);
g_handlers_while_forking.clear();
// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->parent_after) {
handler.handler->parent_after(std::move(handler.token));
}
}

g_mutex.unlock();
// handlers will be destroyed here without the mutex locked, so that
// any action taken by destructors might call RegisterAtFork
}

void AfterForkChild() {
// Need to reinitialize the mutex as it is probably invalid. Also, the
// old mutex destructor may fail.
// Fortunately, we are a single thread in the child process by now, so no
// additional synchronization is needed.
new (&g_mutex) std::mutex;

auto handlers = std::move(g_handlers_while_forking);
g_handlers_while_forking.clear();
// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->child_after) {
handler.handler->child_after(std::move(handler.token));
}
}
}

struct AtForkInitializer {
AtForkInitializer() {
#ifndef _WIN32
int r = pthread_atfork(&BeforeFork, &AfterForkParent, &AfterForkChild);
if (r != 0) {
IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort();
}
#endif
}
};

}; // namespace

void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
static AtForkInitializer initializer;

std::lock_guard<std::mutex> lock(g_mutex);
MaintainHandlersUnlocked();
g_handlers.push_back(std::move(weak_handler));
}

} // namespace internal
} // namespace arrow
59 changes: 59 additions & 0 deletions cpp/src/arrow/util/atfork_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <any>
#include <functional>
#include <memory>
#include <utility>

#include "arrow/util/visibility.h"

namespace arrow {
namespace internal {

struct ARROW_EXPORT AtForkHandler {
using CallbackBefore = std::function<std::any()>;
using CallbackAfter = std::function<void(std::any)>;

// The before-fork callback can return an arbitrary token (wrapped in std::any)
// that will passed as-is to after-fork callbacks. This can ensure that any
// resource necessary for after-fork handling is kept alive.
CallbackBefore before;
CallbackAfter parent_after;
CallbackAfter child_after;

AtForkHandler() = default;

explicit AtForkHandler(CallbackAfter child_after)
: child_after(std::move(child_after)) {}

AtForkHandler(CallbackBefore before, CallbackAfter parent_after,
CallbackAfter child_after)
: before(std::move(before)),
parent_after(std::move(parent_after)),
child_after(std::move(child_after)) {}
};

// Register the given at-fork handlers. Their intended lifetime should be tracked by
// calling code using an owning shared_ptr.
ARROW_EXPORT
void RegisterAtFork(std::weak_ptr<AtForkHandler>);

} // namespace internal
} // namespace arrow
Loading

0 comments on commit 5ef742a

Please sign in to comment.