Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17975: [C++] Create at-fork facility #14594

Merged
merged 5 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ set(ARROW_SRCS
io/stdio.cc
io/transform.cc
util/async_util.cc
util/atfork_internal.cc
util/basic_decimal.cc
util/bit_block_counter.cc
util/bit_run_reader.cc
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,22 @@ bool FileIsClosed(int fd) {
#endif
}

#if !defined(_WIN32)
void AssertChildExit(int child_pid, int expected_exit_status) {
ASSERT_GT(child_pid, 0);
int child_status;
int got_pid = waitpid(child_pid, &child_status, 0);
ASSERT_EQ(got_pid, child_pid);
if (WIFSIGNALED(child_status)) {
FAIL() << "Child terminated by signal " << WTERMSIG(child_status);
}
if (!WIFEXITED(child_status)) {
FAIL() << "Child didn't terminate normally?? Child status = " << child_status;
}
ASSERT_EQ(WEXITSTATUS(child_status), expected_exit_status);
}
#endif

bool LocaleExists(const char* locale) {
try {
std::locale loc(locale);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ std::vector<T> IteratorToVector(Iterator<T> iterator) {
ARROW_TESTING_EXPORT
bool LocaleExists(const char* locale);

#ifndef _WIN32
ARROW_TESTING_EXPORT
void AssertChildExit(int child_pid, int expected_exit_status = 0);
#endif

// A RAII-style object that switches to a new locale, and switches back
// to the old locale when going out of scope. Doesn't do anything if the
// new locale doesn't exist on the local machine.
Expand Down
18 changes: 13 additions & 5 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ endif()
add_arrow_test(utility-test
SOURCES
align_util_test.cc
async_generator_test.cc
async_util_test.cc
bit_block_counter_test.cc
bit_util_test.cc
atfork_test.cc
byte_size_test.cc
cache_test.cc
checked_cast_test.cc
Expand All @@ -60,7 +57,6 @@ add_arrow_test(utility-test
queue_test.cc
range_test.cc
reflection_test.cc
rle_encoding_test.cc
small_vector_test.cc
stl_util_test.cc
string_test.cc
Expand All @@ -73,6 +69,18 @@ add_arrow_test(utility-test
utf8_util_test.cc
value_parsing_test.cc)

add_arrow_test(async-utility-test
SOURCES
async_generator_test.cc
async_util_test.cc
test_common.cc)

add_arrow_test(bit-utility-test
SOURCES
bit_block_counter_test.cc
bit_util_test.cc
rle_encoding_test.cc)

add_arrow_test(threading-utility-test
SOURCES
cancel_test.cc
Expand Down
138 changes: 138 additions & 0 deletions cpp/src/arrow/util/atfork_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/util/atfork_internal.h"

#include <algorithm>
#include <atomic>
#include <mutex>
#include <vector>

#ifndef _WIN32
#include <pthread.h>
#endif

#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"

namespace arrow {
namespace internal {

namespace {

struct RunningHandler {
// A temporary owning copy of a handler, to make sure that a handler
// that runs before fork can still run after fork.
std::shared_ptr<AtForkHandler> handler;
// The token returned by the before-fork handler, to pass to after-fork handlers.
std::any token;

explicit RunningHandler(std::shared_ptr<AtForkHandler> handler)
: handler(std::move(handler)) {}
};

std::mutex g_mutex;
std::vector<std::weak_ptr<AtForkHandler>> g_handlers;
std::vector<RunningHandler> g_handlers_while_forking;

void MaintainHandlersUnlocked() {
auto it = std::remove_if(
g_handlers.begin(), g_handlers.end(),
[](const std::weak_ptr<AtForkHandler>& ptr) { return ptr.expired(); });
g_handlers.erase(it, g_handlers.end());
}

void BeforeFork() {
// Lock the mutex and keep it locked until the end of AfterForkParent(),
// to avoid multiple concurrent forks and atforks.
g_mutex.lock();

DCHECK(g_handlers_while_forking.empty()); // AfterForkParent clears it

for (const auto& weak_handler : g_handlers) {
if (auto handler = weak_handler.lock()) {
g_handlers_while_forking.emplace_back(std::move(handler));
}
}

// XXX can the handler call RegisterAtFork()?
for (auto&& handler : g_handlers_while_forking) {
if (handler.handler->before) {
handler.token = handler.handler->before();
}
}
}

void AfterForkParent() {
// The mutex was locked by BeforeFork()

auto handlers = std::move(g_handlers_while_forking);
g_handlers_while_forking.clear();
// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->parent_after) {
handler.handler->parent_after(std::move(handler.token));
}
}

g_mutex.unlock();
// handlers will be destroyed here without the mutex locked, so that
// any action taken by destructors might call RegisterAtFork
}

void AfterForkChild() {
// Need to reinitialize the mutex as it is probably invalid. Also, the
// old mutex destructor may fail.
// Fortunately, we are a single thread in the child process by now, so no
// additional synchronization is needed.
new (&g_mutex) std::mutex;

auto handlers = std::move(g_handlers_while_forking);
g_handlers_while_forking.clear();
// Execute handlers in reverse order
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
auto&& handler = *it;
if (handler.handler->child_after) {
handler.handler->child_after(std::move(handler.token));
}
}
}

struct AtForkInitializer {
AtForkInitializer() {
#ifndef _WIN32
int r = pthread_atfork(&BeforeFork, &AfterForkParent, &AfterForkChild);
if (r != 0) {
IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort();
}
#endif
}
};

}; // namespace

void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
static AtForkInitializer initializer;

std::lock_guard<std::mutex> lock(g_mutex);
MaintainHandlersUnlocked();
g_handlers.push_back(std::move(weak_handler));
}

} // namespace internal
} // namespace arrow
59 changes: 59 additions & 0 deletions cpp/src/arrow/util/atfork_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <any>
#include <functional>
#include <memory>
#include <utility>

#include "arrow/util/visibility.h"

namespace arrow {
namespace internal {

struct ARROW_EXPORT AtForkHandler {
using CallbackBefore = std::function<std::any()>;
using CallbackAfter = std::function<void(std::any)>;

// The before-fork callback can return an arbitrary token (wrapped in std::any)
// that will passed as-is to after-fork callbacks. This can ensure that any
// resource necessary for after-fork handling is kept alive.
CallbackBefore before;
CallbackAfter parent_after;
CallbackAfter child_after;

AtForkHandler() = default;

explicit AtForkHandler(CallbackAfter child_after)
: child_after(std::move(child_after)) {}

AtForkHandler(CallbackBefore before, CallbackAfter parent_after,
CallbackAfter child_after)
: before(std::move(before)),
parent_after(std::move(parent_after)),
child_after(std::move(child_after)) {}
};

// Register the given at-fork handlers. Their intended lifetime should be tracked by
// calling code using an owning shared_ptr.
ARROW_EXPORT
void RegisterAtFork(std::weak_ptr<AtForkHandler>);

} // namespace internal
} // namespace arrow
Loading