From 05cf7ddd68db29fdb2d7ae67154fb4a5da45bd46 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 3 Nov 2022 18:13:58 +0100 Subject: [PATCH 1/5] ARROW-17975: [C++] Create at-fork facility --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/testing/gtest_util.cc | 16 ++ cpp/src/arrow/testing/gtest_util.h | 5 + cpp/src/arrow/util/CMakeLists.txt | 1 + cpp/src/arrow/util/atfork_internal.cc | 134 +++++++++++ cpp/src/arrow/util/atfork_internal.h | 50 +++++ cpp/src/arrow/util/atfork_test.cc | 293 +++++++++++++++++++++++++ cpp/src/arrow/util/thread_pool_test.cc | 29 +-- 8 files changed, 507 insertions(+), 22 deletions(-) create mode 100644 cpp/src/arrow/util/atfork_internal.cc create mode 100644 cpp/src/arrow/util/atfork_internal.h create mode 100644 cpp/src/arrow/util/atfork_test.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 099a862376041..e1e409d0a7d9f 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -192,6 +192,7 @@ set(ARROW_SRCS io/stdio.cc io/transform.cc util/async_util.cc + util/atfork_internal.cc util/basic_decimal.cc util/bit_block_counter.cc util/bit_run_reader.cc diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index dfc14277d6e24..37c430892d022 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -573,6 +573,22 @@ bool FileIsClosed(int fd) { #endif } +#if !defined(_WIN32) +void AssertChildExit(int child_pid, int expected_exit_status) { + ASSERT_GT(child_pid, 0); + int child_status; + int got_pid = waitpid(child_pid, &child_status, 0); + ASSERT_EQ(got_pid, child_pid); + if (WIFSIGNALED(child_status)) { + FAIL() << "Child terminated by signal " << WTERMSIG(child_status); + } + if (!WIFEXITED(child_status)) { + FAIL() << "Child didn't terminate normally?? Child status = " << child_status; + } + ASSERT_EQ(WEXITSTATUS(child_status), expected_exit_status); +} +#endif + bool LocaleExists(const char* locale) { try { std::locale loc(locale); diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 389b809aace60..270805629529b 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -378,6 +378,11 @@ std::vector IteratorToVector(Iterator iterator) { ARROW_TESTING_EXPORT bool LocaleExists(const char* locale); +#ifndef _WIN32 +ARROW_TESTING_EXPORT +void AssertChildExit(int child_pid, int expected_exit_status = 0); +#endif + // A RAII-style object that switches to a new locale, and switches back // to the old locale when going out of scope. Doesn't do anything if the // new locale doesn't exist on the local machine. diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 051138a002bd5..f90daf550ca80 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -43,6 +43,7 @@ add_arrow_test(utility-test align_util_test.cc async_generator_test.cc async_util_test.cc + atfork_test.cc bit_block_counter_test.cc bit_util_test.cc byte_size_test.cc diff --git a/cpp/src/arrow/util/atfork_internal.cc b/cpp/src/arrow/util/atfork_internal.cc new file mode 100644 index 0000000000000..3befcadf5f02e --- /dev/null +++ b/cpp/src/arrow/util/atfork_internal.cc @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/atfork_internal.h" + +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#endif + +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace internal { + +namespace { + +std::mutex g_mutex; + +std::vector> g_handlers; +// A temporary owning copy of handlers, to make sure that any handlers +// that run before fork can still run after fork. +std::vector> g_handlers_while_forking; + +void MaintainHandlersUnlocked() { + auto it = std::remove_if( + g_handlers.begin(), g_handlers.end(), + [](const std::weak_ptr& ptr) { return ptr.expired(); }); + g_handlers.erase(it, g_handlers.end()); +} + +void BeforeFork() { + ARROW_LOG(INFO) << "BeforeFork"; + // Lock the mutex and keep it locked during the actual fork() + g_mutex.lock(); + + for (const auto& weak_handler : g_handlers) { + if (auto handler = weak_handler.lock()) { + g_handlers_while_forking.push_back(std::move(handler)); + } + } + // XXX can the handler call RegisterAtFork()? + for (auto&& handler : g_handlers_while_forking) { + if (handler->before) { + ARROW_LOG(INFO) << "BeforeFork: calling handler"; + handler->before(); + } + } +} + +void AfterForkParent() { + // The mutex was already locked by BeforeFork() + ARROW_LOG(INFO) << "AfterForkParent"; + + auto handlers = std::move(g_handlers_while_forking); + // Execute handlers in reverse order + for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { + auto&& handler = *it; + if (handler->parent_after) { + handler->parent_after(); + } + } + + g_mutex.unlock(); + // handlers will be destroyed here without the mutex locked, so that + // any action taken by destructors might call RegisterAtFork +} + +void AfterForkChild() { + ARROW_LOG(INFO) << "AfterForkChild"; + // Need to reinitialize the mutex as it is probably invalid. Also, the + // old mutex destructor may fail. + // Fortunately, we are a single thread in the child process by now, so no + // additional synchronization is needed. + new (&g_mutex) std::mutex; + std::unique_lock lock(g_mutex); + + auto handlers = std::move(g_handlers_while_forking); + // Execute handlers in reverse order + for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { + auto&& handler = *it; + if (handler->child_after) { + handler->child_after(); + } + } + + lock.unlock(); + // handlers will be destroyed here without the mutex locked, so that + // any action taken by destructors might call RegisterAtFork +} + +struct AtForkInitializer { + AtForkInitializer() { +#ifndef _WIN32 + int r = pthread_atfork(&BeforeFork, &AfterForkParent, &AfterForkChild); + if (r != 0) { + IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort(); + } + ARROW_LOG(INFO) << "pthread_atfork ok"; +#endif + } +}; + +}; // namespace + +void RegisterAtFork(std::weak_ptr weak_handler) { + static AtForkInitializer initializer; + + std::lock_guard lock(g_mutex); + MaintainHandlersUnlocked(); + g_handlers.push_back(std::move(weak_handler)); +} + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/atfork_internal.h b/cpp/src/arrow/util/atfork_internal.h new file mode 100644 index 0000000000000..554232e8a9ddf --- /dev/null +++ b/cpp/src/arrow/util/atfork_internal.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/util/visibility.h" + +namespace arrow { +namespace internal { + +struct ARROW_EXPORT AtForkHandler { + using Callback = std::function; + + Callback before; + Callback parent_after; + Callback child_after; + + AtForkHandler() = default; + + explicit AtForkHandler(Callback child_after) : child_after(std::move(child_after)) {} + + AtForkHandler(Callback before, Callback parent_after, Callback child_after) + : before(std::move(before)), + parent_after(std::move(parent_after)), + child_after(std::move(child_after)) {} +}; + +ARROW_EXPORT +void RegisterAtFork(std::weak_ptr); + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/atfork_test.cc b/cpp/src/arrow/util/atfork_test.cc new file mode 100644 index 0000000000000..802874cb5b40d --- /dev/null +++ b/cpp/src/arrow/util/atfork_test.cc @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#endif + +#include +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/util/atfork_internal.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace internal { + +using testing::ElementsAre; +using testing::IsSubsetOf; +using testing::UnorderedElementsAreArray; + +class TestAtFork : public ::testing::Test { + public: + using Callback = typename AtForkHandler::Callback; + + Callback PushBefore(int v) { + return [this, v]() { + std::lock_guard lock(mutex_); + before_.push_back(v); + }; + } + + Callback PushParentAfter(int v) { + return [this, v]() { + std::lock_guard lock(mutex_); + parent_after_.push_back(v); + }; + } + + Callback PushChildAfter(int v) { + return [this, v]() { + // Mutex may be invalid and child is single-thread anyway + child_after_.push_back(v); + }; + } + + void Reset() { + std::lock_guard lock(mutex_); + before_.clear(); + parent_after_.clear(); + child_after_.clear(); + } + +#ifndef _WIN32 + void RunInChild(std::function func) { + auto child_pid = fork(); + if (child_pid == -1) { + ASSERT_OK(IOErrorFromErrno(errno, "Error calling fork(): ")); + } + if (child_pid == 0) { + // Child + ASSERT_NO_FATAL_FAILURE(func()) << "Failure in child process"; + std::exit(0); + } else { + // Parent + AssertChildExit(child_pid); + } + } +#endif + + std::mutex mutex_; + std::vector before_; + std::vector parent_after_; + std::vector child_after_; +}; + +#ifndef _WIN32 + +TEST_F(TestAtFork, EmptyHandlers) { + auto handlers = std::make_shared(); + + RegisterAtFork(handlers); + RegisterAtFork(handlers); + + RunInChild([&]() { + ASSERT_TRUE(before_.empty()); + ASSERT_TRUE(parent_after_.empty()); + ASSERT_TRUE(child_after_.empty()); + }); + + ASSERT_TRUE(before_.empty()); + ASSERT_TRUE(parent_after_.empty()); + ASSERT_TRUE(child_after_.empty()); + + handlers.reset(); + + RunInChild([]() {}); +} + +TEST_F(TestAtFork, SingleThread) { + auto handlers1 = std::make_shared(PushBefore(1), PushParentAfter(11), + PushChildAfter(21)); + auto handlers2 = std::make_shared(PushBefore(2), PushParentAfter(12), + PushChildAfter(22)); + + RegisterAtFork(handlers1); + RegisterAtFork(handlers2); + + RunInChild([&]() { + ASSERT_THAT(before_, ElementsAre(1, 2)); + ASSERT_THAT(parent_after_, ElementsAre()); + ASSERT_THAT(child_after_, ElementsAre(22, 21)); + }); + ASSERT_THAT(before_, ElementsAre(1, 2)); + ASSERT_THAT(parent_after_, ElementsAre(12, 11)); + ASSERT_THAT(child_after_, ElementsAre()); + Reset(); + + // Destroy one handler + handlers1.reset(); + + RunInChild([&]() { + ASSERT_THAT(before_, ElementsAre(2)); + ASSERT_THAT(parent_after_, ElementsAre()); + ASSERT_THAT(child_after_, ElementsAre(22)); + }); + ASSERT_THAT(before_, ElementsAre(2)); + ASSERT_THAT(parent_after_, ElementsAre(12)); + ASSERT_THAT(child_after_, ElementsAre()); + Reset(); + + // Destroy other handler, create new ones + auto handlers3 = std::make_shared(PushBefore(3), PushParentAfter(13), + PushChildAfter(23)); + auto handlers4 = std::make_shared(PushBefore(4), PushParentAfter(14), + PushChildAfter(24)); + + RegisterAtFork(handlers3); + RegisterAtFork(handlers4); + handlers2.reset(); + + RunInChild([&]() { + ASSERT_THAT(before_, ElementsAre(3, 4)); + ASSERT_THAT(parent_after_, ElementsAre()); + ASSERT_THAT(child_after_, ElementsAre(24, 23)); + }); + ASSERT_THAT(before_, ElementsAre(3, 4)); + ASSERT_THAT(parent_after_, ElementsAre(14, 13)); + ASSERT_THAT(child_after_, ElementsAre()); +} + +#if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)) + +// The two following tests would seem to leak for various reasons. +// Also, Thread Sanitizer would fail with the same error message as in +// https://github.com/google/sanitizers/issues/950. + +TEST_F(TestAtFork, MultipleThreads) { + const int kNumThreads = 5; + const int kNumIterations = 40; + const int kParentAfterAddend = 10000; + const int kChildAfterAddend = 20000; + std::atomic seed = 12345; + + auto check_values_in_child = [&]() { + std::vector expected_child; + for (const auto v : before_) { + expected_child.push_back(v + kChildAfterAddend); + } + // The handlers that were alive on this fork() are a subset of the handlers + // that were called at any point in the parent. + ASSERT_THAT(child_after_, IsSubsetOf(expected_child)); + }; + + auto run_in_thread = [&](int index) { + std::default_random_engine engine(++seed); + std::uniform_int_distribution value_dist(index * 100, (index + 1) * 100 - 1); + std::bernoulli_distribution fork_dist(0.1); + + for (int i = 0; i < kNumIterations; ++i) { + int value = value_dist(engine); + auto handlers = std::make_shared( + PushBefore(value), PushParentAfter(value + kParentAfterAddend), + PushChildAfter(value + kChildAfterAddend)); + RegisterAtFork(handlers); + if (fork_dist(engine)) { + RunInChild(check_values_in_child); + } + } + }; + + std::vector threads; + for (int i = 0; i < kNumThreads; ++i) { + threads.emplace_back(run_in_thread, i); + } + for (auto&& thread : threads) { + thread.join(); + } + + std::vector expected_parent; + for (const auto v : before_) { + expected_parent.push_back(v + kParentAfterAddend); + } + // The handlers that were called after fork are the same that were called + // before fork; however, their overall order is undefined as multiple fork() + // calls were made and multiple handlers may have been alive during + // each fork() called. + ASSERT_THAT(parent_after_, UnorderedElementsAreArray(expected_parent)); + ASSERT_TRUE(child_after_.empty()); +} + +TEST_F(TestAtFork, NestedChild) { +#ifdef __APPLE__ + GTEST_SKIP() << "Nested fork is not supported on macOS"; +#endif + + auto handlers1 = std::make_shared(PushBefore(1), PushParentAfter(11), + PushChildAfter(21)); + auto handlers2 = std::make_shared(PushBefore(2), PushParentAfter(12), + PushChildAfter(22)); + + RegisterAtFork(handlers1); + RegisterAtFork(handlers2); + + RunInChild([&]() { + Reset(); + + // Add a new handler, destroy one of the parent handlers + auto handlers3 = std::make_shared(PushBefore(3), PushParentAfter(13), + PushChildAfter(23)); + RegisterAtFork(handlers3); + handlers2.reset(); + + RunInChild([&]() { + ASSERT_THAT(before_, ElementsAre(1, 3)); + ASSERT_THAT(parent_after_, ElementsAre()); + ASSERT_THAT(child_after_, ElementsAre(23, 21)); + }); + + ASSERT_THAT(before_, ElementsAre(1, 3)); + ASSERT_THAT(parent_after_, ElementsAre(13, 11)); + ASSERT_THAT(child_after_, ElementsAre()); + }); + + ASSERT_THAT(before_, ElementsAre(1, 2)); + ASSERT_THAT(parent_after_, ElementsAre(12, 11)); + ASSERT_THAT(child_after_, ElementsAre()); +} + +#endif // !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || + // defined(THREAD_SANITIZER)) +#endif // !defined(_WIN32) + +#ifdef _WIN32 +TEST_F(TestAtFork, NoOp) { + auto handlers = std::make_shared(PushBefore(1), PushParentAfter(11), + PushChildAfter(21)); + + RegisterAtFork(handlers); + + ASSERT_TRUE(before_.empty()); + ASSERT_TRUE(parent_after_.empty()); + ASSERT_TRUE(child_after_.empty()); +} +#endif + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 13093f4a1d576..799675c6fa93b 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -16,7 +16,7 @@ // under the License. #ifndef _WIN32 -#include +#include #include #endif @@ -750,22 +750,7 @@ TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) { #if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \ defined(THREAD_SANITIZER)) -class TestThreadPoolForkSafety : public TestThreadPool { - public: - void CheckChildExit(int child_pid) { - ASSERT_GT(child_pid, 0); - int child_status; - int got_pid = waitpid(child_pid, &child_status, 0); - ASSERT_EQ(got_pid, child_pid); - if (WIFSIGNALED(child_status)) { - FAIL() << "Child terminated by signal " << WTERMSIG(child_status); - } - if (!WIFEXITED(child_status)) { - FAIL() << "Child didn't terminate normally?? Child status = " << child_status; - } - ASSERT_EQ(WEXITSTATUS(child_status), 0); - } -}; +class TestThreadPoolForkSafety : public TestThreadPool {}; TEST_F(TestThreadPoolForkSafety, Basics) { { @@ -784,7 +769,7 @@ TEST_F(TestThreadPoolForkSafety, Basics) { std::exit(st.ok() ? 0 : 2); } else { // Parent - CheckChildExit(child_pid); + AssertChildExit(child_pid); ASSERT_OK(pool->Shutdown()); } } @@ -806,7 +791,7 @@ TEST_F(TestThreadPoolForkSafety, Basics) { std::exit(0); } else { // Parent - CheckChildExit(child_pid); + AssertChildExit(child_pid); } } } @@ -851,7 +836,7 @@ TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) { std::exit(0); } else { // Parent - CheckChildExit(child_pid); + AssertChildExit(child_pid); ASSERT_OK(pool->Shutdown()); } } @@ -879,14 +864,14 @@ TEST_F(TestThreadPoolForkSafety, NestedChild) { ASSERT_OK(pool->Shutdown()); } else { // Child - CheckChildExit(grandchild_pid); + AssertChildExit(grandchild_pid); ASSERT_FINISHES_OK_AND_EQ(7, fut); ASSERT_OK(pool->Shutdown()); } std::exit(0); } else { // Parent - CheckChildExit(child_pid); + AssertChildExit(child_pid); ASSERT_OK(pool->Shutdown()); } } From 8919e8ec4be6781626e9247cf8eb3b9933ca8137 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 5 Nov 2022 16:22:44 +0100 Subject: [PATCH 2/5] Split arrow-utility-test --- cpp/src/arrow/util/CMakeLists.txt | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index f90daf550ca80..5141e30d0917a 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -41,11 +41,7 @@ endif() add_arrow_test(utility-test SOURCES align_util_test.cc - async_generator_test.cc - async_util_test.cc atfork_test.cc - bit_block_counter_test.cc - bit_util_test.cc byte_size_test.cc cache_test.cc checked_cast_test.cc @@ -61,7 +57,6 @@ add_arrow_test(utility-test queue_test.cc range_test.cc reflection_test.cc - rle_encoding_test.cc small_vector_test.cc stl_util_test.cc string_test.cc @@ -74,6 +69,18 @@ add_arrow_test(utility-test utf8_util_test.cc value_parsing_test.cc) +add_arrow_test(async-utility-test + SOURCES + async_generator_test.cc + async_util_test.cc + test_common.cc) + +add_arrow_test(bit-utility-test + SOURCES + bit_block_counter_test.cc + bit_util_test.cc + rle_encoding_test.cc) + add_arrow_test(threading-utility-test SOURCES cancel_test.cc From e6e91c49606a20db58e6cc707599df25ca00a5a7 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 5 Nov 2022 16:55:54 +0100 Subject: [PATCH 3/5] Pass an arbitrary token accross fork() call --- cpp/src/arrow/util/atfork_internal.cc | 42 ++++++++++++++++-------- cpp/src/arrow/util/atfork_internal.h | 16 ++++++---- cpp/src/arrow/util/atfork_test.cc | 46 ++++++++++++++++----------- 3 files changed, 65 insertions(+), 39 deletions(-) diff --git a/cpp/src/arrow/util/atfork_internal.cc b/cpp/src/arrow/util/atfork_internal.cc index 3befcadf5f02e..2f0513bebdc72 100644 --- a/cpp/src/arrow/util/atfork_internal.cc +++ b/cpp/src/arrow/util/atfork_internal.cc @@ -34,12 +34,20 @@ namespace internal { namespace { -std::mutex g_mutex; +struct RunningHandler { + // A temporary owning copy of a handler, to make sure that a handler + // that runs before fork can still run after fork. + std::shared_ptr handler; + // The token returned by the before-fork handler, to pass to after-fork handlers. + std::any token; + + explicit RunningHandler(std::shared_ptr handler) + : handler(std::move(handler)) {} +}; +std::mutex g_mutex; std::vector> g_handlers; -// A temporary owning copy of handlers, to make sure that any handlers -// that run before fork can still run after fork. -std::vector> g_handlers_while_forking; +std::vector g_handlers_while_forking; void MaintainHandlersUnlocked() { auto it = std::remove_if( @@ -50,19 +58,23 @@ void MaintainHandlersUnlocked() { void BeforeFork() { ARROW_LOG(INFO) << "BeforeFork"; - // Lock the mutex and keep it locked during the actual fork() + // Lock the mutex and keep it locked until the end of AfterForkParent(), + // to avoid multiple concurrent forks and atforks. g_mutex.lock(); + DCHECK(g_handlers_while_forking.empty()); // AfterForkParent clears it + for (const auto& weak_handler : g_handlers) { if (auto handler = weak_handler.lock()) { - g_handlers_while_forking.push_back(std::move(handler)); + g_handlers_while_forking.emplace_back(std::move(handler)); } } + // XXX can the handler call RegisterAtFork()? for (auto&& handler : g_handlers_while_forking) { - if (handler->before) { + if (handler.handler->before) { ARROW_LOG(INFO) << "BeforeFork: calling handler"; - handler->before(); + handler.token = handler.handler->before(); } } } @@ -72,11 +84,12 @@ void AfterForkParent() { ARROW_LOG(INFO) << "AfterForkParent"; auto handlers = std::move(g_handlers_while_forking); + g_handlers_while_forking.clear(); // Execute handlers in reverse order for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { auto&& handler = *it; - if (handler->parent_after) { - handler->parent_after(); + if (handler.handler->parent_after) { + handler.handler->parent_after(std::move(handler.token)); } } @@ -92,18 +105,19 @@ void AfterForkChild() { // Fortunately, we are a single thread in the child process by now, so no // additional synchronization is needed. new (&g_mutex) std::mutex; - std::unique_lock lock(g_mutex); + // std::unique_lock lock(g_mutex); auto handlers = std::move(g_handlers_while_forking); + g_handlers_while_forking.clear(); // Execute handlers in reverse order for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { auto&& handler = *it; - if (handler->child_after) { - handler->child_after(); + if (handler.handler->child_after) { + handler.handler->child_after(std::move(handler.token)); } } - lock.unlock(); + // lock.unlock(); // handlers will be destroyed here without the mutex locked, so that // any action taken by destructors might call RegisterAtFork } diff --git a/cpp/src/arrow/util/atfork_internal.h b/cpp/src/arrow/util/atfork_internal.h index 554232e8a9ddf..b1f1119ede843 100644 --- a/cpp/src/arrow/util/atfork_internal.h +++ b/cpp/src/arrow/util/atfork_internal.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -27,17 +28,20 @@ namespace arrow { namespace internal { struct ARROW_EXPORT AtForkHandler { - using Callback = std::function; + using CallbackBefore = std::function; + using CallbackAfter = std::function; - Callback before; - Callback parent_after; - Callback child_after; + CallbackBefore before; + CallbackAfter parent_after; + CallbackAfter child_after; AtForkHandler() = default; - explicit AtForkHandler(Callback child_after) : child_after(std::move(child_after)) {} + explicit AtForkHandler(CallbackAfter child_after) + : child_after(std::move(child_after)) {} - AtForkHandler(Callback before, Callback parent_after, Callback child_after) + AtForkHandler(CallbackBefore before, CallbackAfter parent_after, + CallbackAfter child_after) : before(std::move(before)), parent_after(std::move(parent_after)), child_after(std::move(child_after)) {} diff --git a/cpp/src/arrow/util/atfork_test.cc b/cpp/src/arrow/util/atfork_test.cc index 802874cb5b40d..004e28e19514a 100644 --- a/cpp/src/arrow/util/atfork_test.cc +++ b/cpp/src/arrow/util/atfork_test.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #ifndef _WIN32 @@ -46,26 +47,32 @@ using testing::UnorderedElementsAreArray; class TestAtFork : public ::testing::Test { public: - using Callback = typename AtForkHandler::Callback; + using CallbackBefore = typename AtForkHandler::CallbackBefore; + using CallbackAfter = typename AtForkHandler::CallbackAfter; - Callback PushBefore(int v) { + CallbackBefore PushBefore(int v) { return [this, v]() { std::lock_guard lock(mutex_); before_.push_back(v); + return v; }; } - Callback PushParentAfter(int v) { - return [this, v]() { + CallbackAfter PushParentAfter(int w) { + return [this, w](std::any token) { + const int* v = std::any_cast(&token); + ASSERT_NE(v, nullptr); std::lock_guard lock(mutex_); - parent_after_.push_back(v); + parent_after_.emplace_back(*v + w); }; } - Callback PushChildAfter(int v) { - return [this, v]() { + CallbackAfter PushChildAfter(int w) { + return [this, w](std::any token) { + const int* v = std::any_cast(&token); + ASSERT_NE(v, nullptr); // Mutex may be invalid and child is single-thread anyway - child_after_.push_back(v); + child_after_.push_back(*v + w); }; } @@ -134,10 +141,10 @@ TEST_F(TestAtFork, SingleThread) { RunInChild([&]() { ASSERT_THAT(before_, ElementsAre(1, 2)); ASSERT_THAT(parent_after_, ElementsAre()); - ASSERT_THAT(child_after_, ElementsAre(22, 21)); + ASSERT_THAT(child_after_, ElementsAre(2 + 22, 1 + 21)); }); ASSERT_THAT(before_, ElementsAre(1, 2)); - ASSERT_THAT(parent_after_, ElementsAre(12, 11)); + ASSERT_THAT(parent_after_, ElementsAre(2 + 12, 1 + 11)); ASSERT_THAT(child_after_, ElementsAre()); Reset(); @@ -147,10 +154,10 @@ TEST_F(TestAtFork, SingleThread) { RunInChild([&]() { ASSERT_THAT(before_, ElementsAre(2)); ASSERT_THAT(parent_after_, ElementsAre()); - ASSERT_THAT(child_after_, ElementsAre(22)); + ASSERT_THAT(child_after_, ElementsAre(2 + 22)); }); ASSERT_THAT(before_, ElementsAre(2)); - ASSERT_THAT(parent_after_, ElementsAre(12)); + ASSERT_THAT(parent_after_, ElementsAre(2 + 12)); ASSERT_THAT(child_after_, ElementsAre()); Reset(); @@ -167,10 +174,10 @@ TEST_F(TestAtFork, SingleThread) { RunInChild([&]() { ASSERT_THAT(before_, ElementsAre(3, 4)); ASSERT_THAT(parent_after_, ElementsAre()); - ASSERT_THAT(child_after_, ElementsAre(24, 23)); + ASSERT_THAT(child_after_, ElementsAre(4 + 24, 3 + 23)); }); ASSERT_THAT(before_, ElementsAre(3, 4)); - ASSERT_THAT(parent_after_, ElementsAre(14, 13)); + ASSERT_THAT(parent_after_, ElementsAre(4 + 14, 3 + 13)); ASSERT_THAT(child_after_, ElementsAre()); } @@ -190,7 +197,7 @@ TEST_F(TestAtFork, MultipleThreads) { auto check_values_in_child = [&]() { std::vector expected_child; for (const auto v : before_) { - expected_child.push_back(v + kChildAfterAddend); + expected_child.push_back(v + v + kChildAfterAddend); } // The handlers that were alive on this fork() are a subset of the handlers // that were called at any point in the parent. @@ -224,7 +231,7 @@ TEST_F(TestAtFork, MultipleThreads) { std::vector expected_parent; for (const auto v : before_) { - expected_parent.push_back(v + kParentAfterAddend); + expected_parent.push_back(v + v + kParentAfterAddend); } // The handlers that were called after fork are the same that were called // before fork; however, their overall order is undefined as multiple fork() @@ -259,21 +266,22 @@ TEST_F(TestAtFork, NestedChild) { RunInChild([&]() { ASSERT_THAT(before_, ElementsAre(1, 3)); ASSERT_THAT(parent_after_, ElementsAre()); - ASSERT_THAT(child_after_, ElementsAre(23, 21)); + ASSERT_THAT(child_after_, ElementsAre(3 + 23, 1 + 21)); }); ASSERT_THAT(before_, ElementsAre(1, 3)); - ASSERT_THAT(parent_after_, ElementsAre(13, 11)); + ASSERT_THAT(parent_after_, ElementsAre(3 + 13, 1 + 11)); ASSERT_THAT(child_after_, ElementsAre()); }); ASSERT_THAT(before_, ElementsAre(1, 2)); - ASSERT_THAT(parent_after_, ElementsAre(12, 11)); + ASSERT_THAT(parent_after_, ElementsAre(2 + 12, 1 + 11)); ASSERT_THAT(child_after_, ElementsAre()); } #endif // !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || // defined(THREAD_SANITIZER)) + #endif // !defined(_WIN32) #ifdef _WIN32 From 961a457c8ad428a96df5a71ace3269d4846e5b76 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 5 Nov 2022 18:45:16 +0100 Subject: [PATCH 4/5] Migrate ThreadPool to use an at-fork handler --- cpp/src/arrow/util/atfork_internal.cc | 12 +--- cpp/src/arrow/util/atfork_internal.h | 5 ++ cpp/src/arrow/util/thread_pool.cc | 85 +++++++++++++++------------ cpp/src/arrow/util/thread_pool.h | 13 ---- 4 files changed, 53 insertions(+), 62 deletions(-) diff --git a/cpp/src/arrow/util/atfork_internal.cc b/cpp/src/arrow/util/atfork_internal.cc index 2f0513bebdc72..99c2cdeb6c332 100644 --- a/cpp/src/arrow/util/atfork_internal.cc +++ b/cpp/src/arrow/util/atfork_internal.cc @@ -57,7 +57,6 @@ void MaintainHandlersUnlocked() { } void BeforeFork() { - ARROW_LOG(INFO) << "BeforeFork"; // Lock the mutex and keep it locked until the end of AfterForkParent(), // to avoid multiple concurrent forks and atforks. g_mutex.lock(); @@ -73,15 +72,13 @@ void BeforeFork() { // XXX can the handler call RegisterAtFork()? for (auto&& handler : g_handlers_while_forking) { if (handler.handler->before) { - ARROW_LOG(INFO) << "BeforeFork: calling handler"; handler.token = handler.handler->before(); } } } void AfterForkParent() { - // The mutex was already locked by BeforeFork() - ARROW_LOG(INFO) << "AfterForkParent"; + // The mutex was locked by BeforeFork() auto handlers = std::move(g_handlers_while_forking); g_handlers_while_forking.clear(); @@ -99,13 +96,11 @@ void AfterForkParent() { } void AfterForkChild() { - ARROW_LOG(INFO) << "AfterForkChild"; // Need to reinitialize the mutex as it is probably invalid. Also, the // old mutex destructor may fail. // Fortunately, we are a single thread in the child process by now, so no // additional synchronization is needed. new (&g_mutex) std::mutex; - // std::unique_lock lock(g_mutex); auto handlers = std::move(g_handlers_while_forking); g_handlers_while_forking.clear(); @@ -116,10 +111,6 @@ void AfterForkChild() { handler.handler->child_after(std::move(handler.token)); } } - - // lock.unlock(); - // handlers will be destroyed here without the mutex locked, so that - // any action taken by destructors might call RegisterAtFork } struct AtForkInitializer { @@ -129,7 +120,6 @@ struct AtForkInitializer { if (r != 0) { IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort(); } - ARROW_LOG(INFO) << "pthread_atfork ok"; #endif } }; diff --git a/cpp/src/arrow/util/atfork_internal.h b/cpp/src/arrow/util/atfork_internal.h index b1f1119ede843..2ba1729c03e0b 100644 --- a/cpp/src/arrow/util/atfork_internal.h +++ b/cpp/src/arrow/util/atfork_internal.h @@ -31,6 +31,9 @@ struct ARROW_EXPORT AtForkHandler { using CallbackBefore = std::function; using CallbackAfter = std::function; + // The before-fork callback can return an arbitrary token (wrapped in std::any) + // that will passed as-is to after-fork callbacks. This can ensure that any + // resource necessary for after-fork handling is kept alive. CallbackBefore before; CallbackAfter parent_after; CallbackAfter child_after; @@ -47,6 +50,8 @@ struct ARROW_EXPORT AtForkHandler { child_after(std::move(child_after)) {} }; +// Register the given at-fork handlers. Their intended lifetime should be tracked by +// calling code using an owning shared_ptr. ARROW_EXPORT void RegisterAtFork(std::weak_ptr); diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 550e82b00b059..5ca1713047a84 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -26,6 +26,7 @@ #include #include +#include "arrow/util/atfork_internal.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" #include "arrow/util/mutex.h" @@ -203,6 +204,28 @@ struct ThreadPool::State { bool quick_shutdown_ = false; std::vector> kept_alive_resources_; + + // At-fork machinery + + void BeforeFork() { + mutex_.lock(); + } + + void ParentAfterFork() { + mutex_.unlock(); + } + + void ChildAfterFork() { + int desired_capacity = desired_capacity_; + bool please_shutdown = please_shutdown_; + bool quick_shutdown = quick_shutdown_; + new (this) State; // force-reinitialize, including synchronization primitives + desired_capacity_ = desired_capacity; + please_shutdown_ = please_shutdown; + quick_shutdown_ = quick_shutdown; + } + + std::shared_ptr atfork_handler_; }; // The worker loop is an independent function so that it can keep running @@ -287,8 +310,30 @@ ThreadPool::ThreadPool() : sp_state_(std::make_shared()), state_(sp_state_.get()), shutdown_on_destroy_(true) { -#ifndef _WIN32 - pid_ = getpid(); + // Eternal thread pools would produce false leak reports in the vector of + // atfork handlers. +#if !(defined(_WIN32) || defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)) + state_->atfork_handler_ = std::make_shared( + /*before=*/ [weak_state = std::weak_ptr(sp_state_)]() { + auto state = weak_state.lock(); + if (state) { + state->BeforeFork(); + } + return state; // passed to after-forkers + }, + /*parent_after=*/ [](std::any token) { + auto state = std::any_cast>(token); + if (state) { + state->ParentAfterFork(); + } + }, + /*child_after=*/ [](std::any token) { + auto state = std::any_cast>(token); + if (state) { + state->ChildAfterFork(); + } + }); + RegisterAtFork(state_->atfork_handler_); #endif } @@ -298,38 +343,7 @@ ThreadPool::~ThreadPool() { } } -void ThreadPool::ProtectAgainstFork() { -#ifndef _WIN32 - pid_t current_pid = getpid(); - if (pid_.load() != current_pid) { - // Reinitialize internal state in child process after fork(). - { - // Since after-fork reinitialization is triggered when one of the ThreadPool - // methods is called, it can be very well be called from multiple threads - // at once. Therefore, it needs to be guarded with a lock. - auto lock = util::GlobalForkSafeMutex()->Lock(); - - if (pid_.load() != current_pid) { - int capacity = state_->desired_capacity_; - - auto new_state = std::make_shared(); - new_state->please_shutdown_ = state_->please_shutdown_; - new_state->quick_shutdown_ = state_->quick_shutdown_; - - sp_state_ = new_state; - state_ = sp_state_.get(); - pid_ = current_pid; - - // Launch worker threads anew - ARROW_UNUSED(SetCapacity(capacity)); - } - } - } -#endif -} - Status ThreadPool::SetCapacity(int threads) { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); if (state_->please_shutdown_) { return Status::Invalid("operation forbidden during or after shutdown"); @@ -354,25 +368,21 @@ Status ThreadPool::SetCapacity(int threads) { } int ThreadPool::GetCapacity() { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); return state_->desired_capacity_; } int ThreadPool::GetNumTasks() { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); return state_->tasks_queued_or_running_; } int ThreadPool::GetActualCapacity() { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); return static_cast(state_->workers_.size()); } Status ThreadPool::Shutdown(bool wait) { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); if (state_->please_shutdown_) { @@ -419,7 +429,6 @@ void ThreadPool::LaunchWorkersUnlocked(int threads) { Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken stop_token, StopCallback&& stop_callback) { { - ProtectAgainstFork(); #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it // This task-wrapping needs to be done before we grab the mutex because the diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index b2dd706444624..526213ce888d4 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -17,14 +17,6 @@ #pragma once -#ifndef _WIN32 -#include -#endif - -#ifndef _WIN32 -#include -#endif - #include #include #include @@ -475,17 +467,12 @@ class ARROW_EXPORT ThreadPool : public Executor { void LaunchWorkersUnlocked(int threads); // Get the current actual capacity int GetActualCapacity(); - // Reinitialize the thread pool if the pid changed - void ProtectAgainstFork(); static std::shared_ptr MakeCpuThreadPool(); std::shared_ptr sp_state_; State* state_; bool shutdown_on_destroy_; -#ifndef _WIN32 - std::atomic pid_; -#endif }; // Return the process-global thread pool for CPU-bound tasks. From 7cd322ea56e8ceeddc1e61eabbf0734c21f090ff Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 5 Nov 2022 20:57:30 +0100 Subject: [PATCH 5/5] Lint --- cpp/src/arrow/util/thread_pool.cc | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 5ca1713047a84..ca868248de401 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -207,13 +207,9 @@ struct ThreadPool::State { // At-fork machinery - void BeforeFork() { - mutex_.lock(); - } + void BeforeFork() { mutex_.lock(); } - void ParentAfterFork() { - mutex_.unlock(); - } + void ParentAfterFork() { mutex_.unlock(); } void ChildAfterFork() { int desired_capacity = desired_capacity_; @@ -314,20 +310,23 @@ ThreadPool::ThreadPool() // atfork handlers. #if !(defined(_WIN32) || defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)) state_->atfork_handler_ = std::make_shared( - /*before=*/ [weak_state = std::weak_ptr(sp_state_)]() { + /*before=*/ + [weak_state = std::weak_ptr(sp_state_)]() { auto state = weak_state.lock(); if (state) { state->BeforeFork(); } return state; // passed to after-forkers }, - /*parent_after=*/ [](std::any token) { + /*parent_after=*/ + [](std::any token) { auto state = std::any_cast>(token); if (state) { state->ParentAfterFork(); } }, - /*child_after=*/ [](std::any token) { + /*child_after=*/ + [](std::any token) { auto state = std::any_cast>(token); if (state) { state->ChildAfterFork();