diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 099a862376041..e1e409d0a7d9f 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -192,6 +192,7 @@ set(ARROW_SRCS io/stdio.cc io/transform.cc util/async_util.cc + util/atfork_internal.cc util/basic_decimal.cc util/bit_block_counter.cc util/bit_run_reader.cc diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index dfc14277d6e24..37c430892d022 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -573,6 +573,22 @@ bool FileIsClosed(int fd) { #endif } +#if !defined(_WIN32) +void AssertChildExit(int child_pid, int expected_exit_status) { + ASSERT_GT(child_pid, 0); + int child_status; + int got_pid = waitpid(child_pid, &child_status, 0); + ASSERT_EQ(got_pid, child_pid); + if (WIFSIGNALED(child_status)) { + FAIL() << "Child terminated by signal " << WTERMSIG(child_status); + } + if (!WIFEXITED(child_status)) { + FAIL() << "Child didn't terminate normally?? Child status = " << child_status; + } + ASSERT_EQ(WEXITSTATUS(child_status), expected_exit_status); +} +#endif + bool LocaleExists(const char* locale) { try { std::locale loc(locale); diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 389b809aace60..270805629529b 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -378,6 +378,11 @@ std::vector IteratorToVector(Iterator iterator) { ARROW_TESTING_EXPORT bool LocaleExists(const char* locale); +#ifndef _WIN32 +ARROW_TESTING_EXPORT +void AssertChildExit(int child_pid, int expected_exit_status = 0); +#endif + // A RAII-style object that switches to a new locale, and switches back // to the old locale when going out of scope. Doesn't do anything if the // new locale doesn't exist on the local machine. diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 051138a002bd5..5141e30d0917a 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -41,10 +41,7 @@ endif() add_arrow_test(utility-test SOURCES align_util_test.cc - async_generator_test.cc - async_util_test.cc - bit_block_counter_test.cc - bit_util_test.cc + atfork_test.cc byte_size_test.cc cache_test.cc checked_cast_test.cc @@ -60,7 +57,6 @@ add_arrow_test(utility-test queue_test.cc range_test.cc reflection_test.cc - rle_encoding_test.cc small_vector_test.cc stl_util_test.cc string_test.cc @@ -73,6 +69,18 @@ add_arrow_test(utility-test utf8_util_test.cc value_parsing_test.cc) +add_arrow_test(async-utility-test + SOURCES + async_generator_test.cc + async_util_test.cc + test_common.cc) + +add_arrow_test(bit-utility-test + SOURCES + bit_block_counter_test.cc + bit_util_test.cc + rle_encoding_test.cc) + add_arrow_test(threading-utility-test SOURCES cancel_test.cc diff --git a/cpp/src/arrow/util/atfork_internal.cc b/cpp/src/arrow/util/atfork_internal.cc new file mode 100644 index 0000000000000..99c2cdeb6c332 --- /dev/null +++ b/cpp/src/arrow/util/atfork_internal.cc @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/atfork_internal.h" + +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#endif + +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace internal { + +namespace { + +struct RunningHandler { + // A temporary owning copy of a handler, to make sure that a handler + // that runs before fork can still run after fork. + std::shared_ptr handler; + // The token returned by the before-fork handler, to pass to after-fork handlers. + std::any token; + + explicit RunningHandler(std::shared_ptr handler) + : handler(std::move(handler)) {} +}; + +std::mutex g_mutex; +std::vector> g_handlers; +std::vector g_handlers_while_forking; + +void MaintainHandlersUnlocked() { + auto it = std::remove_if( + g_handlers.begin(), g_handlers.end(), + [](const std::weak_ptr& ptr) { return ptr.expired(); }); + g_handlers.erase(it, g_handlers.end()); +} + +void BeforeFork() { + // Lock the mutex and keep it locked until the end of AfterForkParent(), + // to avoid multiple concurrent forks and atforks. + g_mutex.lock(); + + DCHECK(g_handlers_while_forking.empty()); // AfterForkParent clears it + + for (const auto& weak_handler : g_handlers) { + if (auto handler = weak_handler.lock()) { + g_handlers_while_forking.emplace_back(std::move(handler)); + } + } + + // XXX can the handler call RegisterAtFork()? + for (auto&& handler : g_handlers_while_forking) { + if (handler.handler->before) { + handler.token = handler.handler->before(); + } + } +} + +void AfterForkParent() { + // The mutex was locked by BeforeFork() + + auto handlers = std::move(g_handlers_while_forking); + g_handlers_while_forking.clear(); + // Execute handlers in reverse order + for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { + auto&& handler = *it; + if (handler.handler->parent_after) { + handler.handler->parent_after(std::move(handler.token)); + } + } + + g_mutex.unlock(); + // handlers will be destroyed here without the mutex locked, so that + // any action taken by destructors might call RegisterAtFork +} + +void AfterForkChild() { + // Need to reinitialize the mutex as it is probably invalid. Also, the + // old mutex destructor may fail. + // Fortunately, we are a single thread in the child process by now, so no + // additional synchronization is needed. + new (&g_mutex) std::mutex; + + auto handlers = std::move(g_handlers_while_forking); + g_handlers_while_forking.clear(); + // Execute handlers in reverse order + for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) { + auto&& handler = *it; + if (handler.handler->child_after) { + handler.handler->child_after(std::move(handler.token)); + } + } +} + +struct AtForkInitializer { + AtForkInitializer() { +#ifndef _WIN32 + int r = pthread_atfork(&BeforeFork, &AfterForkParent, &AfterForkChild); + if (r != 0) { + IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort(); + } +#endif + } +}; + +}; // namespace + +void RegisterAtFork(std::weak_ptr weak_handler) { + static AtForkInitializer initializer; + + std::lock_guard lock(g_mutex); + MaintainHandlersUnlocked(); + g_handlers.push_back(std::move(weak_handler)); +} + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/atfork_internal.h b/cpp/src/arrow/util/atfork_internal.h new file mode 100644 index 0000000000000..2ba1729c03e0b --- /dev/null +++ b/cpp/src/arrow/util/atfork_internal.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "arrow/util/visibility.h" + +namespace arrow { +namespace internal { + +struct ARROW_EXPORT AtForkHandler { + using CallbackBefore = std::function; + using CallbackAfter = std::function; + + // The before-fork callback can return an arbitrary token (wrapped in std::any) + // that will passed as-is to after-fork callbacks. This can ensure that any + // resource necessary for after-fork handling is kept alive. + CallbackBefore before; + CallbackAfter parent_after; + CallbackAfter child_after; + + AtForkHandler() = default; + + explicit AtForkHandler(CallbackAfter child_after) + : child_after(std::move(child_after)) {} + + AtForkHandler(CallbackBefore before, CallbackAfter parent_after, + CallbackAfter child_after) + : before(std::move(before)), + parent_after(std::move(parent_after)), + child_after(std::move(child_after)) {} +}; + +// Register the given at-fork handlers. Their intended lifetime should be tracked by +// calling code using an owning shared_ptr. +ARROW_EXPORT +void RegisterAtFork(std::weak_ptr); + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/atfork_test.cc b/cpp/src/arrow/util/atfork_test.cc new file mode 100644 index 0000000000000..004e28e19514a --- /dev/null +++ b/cpp/src/arrow/util/atfork_test.cc @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#endif + +#include +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/util/atfork_internal.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace internal { + +using testing::ElementsAre; +using testing::IsSubsetOf; +using testing::UnorderedElementsAreArray; + +class TestAtFork : public ::testing::Test { + public: + using CallbackBefore = typename AtForkHandler::CallbackBefore; + using CallbackAfter = typename AtForkHandler::CallbackAfter; + + CallbackBefore PushBefore(int v) { + return [this, v]() { + std::lock_guard lock(mutex_); + before_.push_back(v); + return v; + }; + } + + CallbackAfter PushParentAfter(int w) { + return [this, w](std::any token) { + const int* v = std::any_cast(&token); + ASSERT_NE(v, nullptr); + std::lock_guard lock(mutex_); + parent_after_.emplace_back(*v + w); + }; + } + + CallbackAfter PushChildAfter(int w) { + return [this, w](std::any token) { + const int* v = std::any_cast(&token); + ASSERT_NE(v, nullptr); + // Mutex may be invalid and child is single-thread anyway + child_after_.push_back(*v + w); + }; + } + + void Reset() { + std::lock_guard lock(mutex_); + before_.clear(); + parent_after_.clear(); + child_after_.clear(); + } + +#ifndef _WIN32 + void RunInChild(std::function func) { + auto child_pid = fork(); + if (child_pid == -1) { + ASSERT_OK(IOErrorFromErrno(errno, "Error calling fork(): ")); + } + if (child_pid == 0) { + // Child + ASSERT_NO_FATAL_FAILURE(func()) << "Failure in child process"; + std::exit(0); + } else { + // Parent + AssertChildExit(child_pid); + } + } +#endif + + std::mutex mutex_; + std::vector before_; + std::vector parent_after_; + std::vector child_after_; +}; + +#ifndef _WIN32 + +TEST_F(TestAtFork, EmptyHandlers) { + auto handlers = std::make_shared(); + + RegisterAtFork(handlers); + RegisterAtFork(handlers); + + RunInChild([&]() { + ASSERT_TRUE(before_.empty()); + ASSERT_TRUE(parent_after_.empty()); + ASSERT_TRUE(child_after_.empty()); + }); + + ASSERT_TRUE(before_.empty()); + ASSERT_TRUE(parent_after_.empty()); + ASSERT_TRUE(child_after_.empty()); + + handlers.reset(); + + RunInChild([]() {}); +} + +TEST_F(TestAtFork, SingleThread) { + auto handlers1 = std::make_shared(PushBefore(1), PushParentAfter(11), + PushChildAfter(21)); + auto handlers2 = std::make_shared(PushBefore(2), PushParentAfter(12), + PushChildAfter(22)); + + RegisterAtFork(handlers1); + RegisterAtFork(handlers2); + + RunInChild([&]() { + ASSERT_THAT(before_, ElementsAre(1, 2)); + ASSERT_THAT(parent_after_, ElementsAre()); + ASSERT_THAT(child_after_, ElementsAre(2 + 22, 1 + 21)); + }); + ASSERT_THAT(before_, ElementsAre(1, 2)); + ASSERT_THAT(parent_after_, ElementsAre(2 + 12, 1 + 11)); + ASSERT_THAT(child_after_, ElementsAre()); + Reset(); + + // Destroy one handler + handlers1.reset(); + + RunInChild([&]() { + ASSERT_THAT(before_, ElementsAre(2)); + ASSERT_THAT(parent_after_, ElementsAre()); + ASSERT_THAT(child_after_, ElementsAre(2 + 22)); + }); + ASSERT_THAT(before_, ElementsAre(2)); + ASSERT_THAT(parent_after_, ElementsAre(2 + 12)); + ASSERT_THAT(child_after_, ElementsAre()); + Reset(); + + // Destroy other handler, create new ones + auto handlers3 = std::make_shared(PushBefore(3), PushParentAfter(13), + PushChildAfter(23)); + auto handlers4 = std::make_shared(PushBefore(4), PushParentAfter(14), + PushChildAfter(24)); + + RegisterAtFork(handlers3); + RegisterAtFork(handlers4); + handlers2.reset(); + + RunInChild([&]() { + ASSERT_THAT(before_, ElementsAre(3, 4)); + ASSERT_THAT(parent_after_, ElementsAre()); + ASSERT_THAT(child_after_, ElementsAre(4 + 24, 3 + 23)); + }); + ASSERT_THAT(before_, ElementsAre(3, 4)); + ASSERT_THAT(parent_after_, ElementsAre(4 + 14, 3 + 13)); + ASSERT_THAT(child_after_, ElementsAre()); +} + +#if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)) + +// The two following tests would seem to leak for various reasons. +// Also, Thread Sanitizer would fail with the same error message as in +// https://github.com/google/sanitizers/issues/950. + +TEST_F(TestAtFork, MultipleThreads) { + const int kNumThreads = 5; + const int kNumIterations = 40; + const int kParentAfterAddend = 10000; + const int kChildAfterAddend = 20000; + std::atomic seed = 12345; + + auto check_values_in_child = [&]() { + std::vector expected_child; + for (const auto v : before_) { + expected_child.push_back(v + v + kChildAfterAddend); + } + // The handlers that were alive on this fork() are a subset of the handlers + // that were called at any point in the parent. + ASSERT_THAT(child_after_, IsSubsetOf(expected_child)); + }; + + auto run_in_thread = [&](int index) { + std::default_random_engine engine(++seed); + std::uniform_int_distribution value_dist(index * 100, (index + 1) * 100 - 1); + std::bernoulli_distribution fork_dist(0.1); + + for (int i = 0; i < kNumIterations; ++i) { + int value = value_dist(engine); + auto handlers = std::make_shared( + PushBefore(value), PushParentAfter(value + kParentAfterAddend), + PushChildAfter(value + kChildAfterAddend)); + RegisterAtFork(handlers); + if (fork_dist(engine)) { + RunInChild(check_values_in_child); + } + } + }; + + std::vector threads; + for (int i = 0; i < kNumThreads; ++i) { + threads.emplace_back(run_in_thread, i); + } + for (auto&& thread : threads) { + thread.join(); + } + + std::vector expected_parent; + for (const auto v : before_) { + expected_parent.push_back(v + v + kParentAfterAddend); + } + // The handlers that were called after fork are the same that were called + // before fork; however, their overall order is undefined as multiple fork() + // calls were made and multiple handlers may have been alive during + // each fork() called. + ASSERT_THAT(parent_after_, UnorderedElementsAreArray(expected_parent)); + ASSERT_TRUE(child_after_.empty()); +} + +TEST_F(TestAtFork, NestedChild) { +#ifdef __APPLE__ + GTEST_SKIP() << "Nested fork is not supported on macOS"; +#endif + + auto handlers1 = std::make_shared(PushBefore(1), PushParentAfter(11), + PushChildAfter(21)); + auto handlers2 = std::make_shared(PushBefore(2), PushParentAfter(12), + PushChildAfter(22)); + + RegisterAtFork(handlers1); + RegisterAtFork(handlers2); + + RunInChild([&]() { + Reset(); + + // Add a new handler, destroy one of the parent handlers + auto handlers3 = std::make_shared(PushBefore(3), PushParentAfter(13), + PushChildAfter(23)); + RegisterAtFork(handlers3); + handlers2.reset(); + + RunInChild([&]() { + ASSERT_THAT(before_, ElementsAre(1, 3)); + ASSERT_THAT(parent_after_, ElementsAre()); + ASSERT_THAT(child_after_, ElementsAre(3 + 23, 1 + 21)); + }); + + ASSERT_THAT(before_, ElementsAre(1, 3)); + ASSERT_THAT(parent_after_, ElementsAre(3 + 13, 1 + 11)); + ASSERT_THAT(child_after_, ElementsAre()); + }); + + ASSERT_THAT(before_, ElementsAre(1, 2)); + ASSERT_THAT(parent_after_, ElementsAre(2 + 12, 1 + 11)); + ASSERT_THAT(child_after_, ElementsAre()); +} + +#endif // !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || + // defined(THREAD_SANITIZER)) + +#endif // !defined(_WIN32) + +#ifdef _WIN32 +TEST_F(TestAtFork, NoOp) { + auto handlers = std::make_shared(PushBefore(1), PushParentAfter(11), + PushChildAfter(21)); + + RegisterAtFork(handlers); + + ASSERT_TRUE(before_.empty()); + ASSERT_TRUE(parent_after_.empty()); + ASSERT_TRUE(child_after_.empty()); +} +#endif + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 550e82b00b059..ca868248de401 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -26,6 +26,7 @@ #include #include +#include "arrow/util/atfork_internal.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" #include "arrow/util/mutex.h" @@ -203,6 +204,24 @@ struct ThreadPool::State { bool quick_shutdown_ = false; std::vector> kept_alive_resources_; + + // At-fork machinery + + void BeforeFork() { mutex_.lock(); } + + void ParentAfterFork() { mutex_.unlock(); } + + void ChildAfterFork() { + int desired_capacity = desired_capacity_; + bool please_shutdown = please_shutdown_; + bool quick_shutdown = quick_shutdown_; + new (this) State; // force-reinitialize, including synchronization primitives + desired_capacity_ = desired_capacity; + please_shutdown_ = please_shutdown; + quick_shutdown_ = quick_shutdown; + } + + std::shared_ptr atfork_handler_; }; // The worker loop is an independent function so that it can keep running @@ -287,8 +306,33 @@ ThreadPool::ThreadPool() : sp_state_(std::make_shared()), state_(sp_state_.get()), shutdown_on_destroy_(true) { -#ifndef _WIN32 - pid_ = getpid(); + // Eternal thread pools would produce false leak reports in the vector of + // atfork handlers. +#if !(defined(_WIN32) || defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)) + state_->atfork_handler_ = std::make_shared( + /*before=*/ + [weak_state = std::weak_ptr(sp_state_)]() { + auto state = weak_state.lock(); + if (state) { + state->BeforeFork(); + } + return state; // passed to after-forkers + }, + /*parent_after=*/ + [](std::any token) { + auto state = std::any_cast>(token); + if (state) { + state->ParentAfterFork(); + } + }, + /*child_after=*/ + [](std::any token) { + auto state = std::any_cast>(token); + if (state) { + state->ChildAfterFork(); + } + }); + RegisterAtFork(state_->atfork_handler_); #endif } @@ -298,38 +342,7 @@ ThreadPool::~ThreadPool() { } } -void ThreadPool::ProtectAgainstFork() { -#ifndef _WIN32 - pid_t current_pid = getpid(); - if (pid_.load() != current_pid) { - // Reinitialize internal state in child process after fork(). - { - // Since after-fork reinitialization is triggered when one of the ThreadPool - // methods is called, it can be very well be called from multiple threads - // at once. Therefore, it needs to be guarded with a lock. - auto lock = util::GlobalForkSafeMutex()->Lock(); - - if (pid_.load() != current_pid) { - int capacity = state_->desired_capacity_; - - auto new_state = std::make_shared(); - new_state->please_shutdown_ = state_->please_shutdown_; - new_state->quick_shutdown_ = state_->quick_shutdown_; - - sp_state_ = new_state; - state_ = sp_state_.get(); - pid_ = current_pid; - - // Launch worker threads anew - ARROW_UNUSED(SetCapacity(capacity)); - } - } - } -#endif -} - Status ThreadPool::SetCapacity(int threads) { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); if (state_->please_shutdown_) { return Status::Invalid("operation forbidden during or after shutdown"); @@ -354,25 +367,21 @@ Status ThreadPool::SetCapacity(int threads) { } int ThreadPool::GetCapacity() { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); return state_->desired_capacity_; } int ThreadPool::GetNumTasks() { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); return state_->tasks_queued_or_running_; } int ThreadPool::GetActualCapacity() { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); return static_cast(state_->workers_.size()); } Status ThreadPool::Shutdown(bool wait) { - ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); if (state_->please_shutdown_) { @@ -419,7 +428,6 @@ void ThreadPool::LaunchWorkersUnlocked(int threads) { Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken stop_token, StopCallback&& stop_callback) { { - ProtectAgainstFork(); #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it // This task-wrapping needs to be done before we grab the mutex because the diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index b2dd706444624..526213ce888d4 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -17,14 +17,6 @@ #pragma once -#ifndef _WIN32 -#include -#endif - -#ifndef _WIN32 -#include -#endif - #include #include #include @@ -475,17 +467,12 @@ class ARROW_EXPORT ThreadPool : public Executor { void LaunchWorkersUnlocked(int threads); // Get the current actual capacity int GetActualCapacity(); - // Reinitialize the thread pool if the pid changed - void ProtectAgainstFork(); static std::shared_ptr MakeCpuThreadPool(); std::shared_ptr sp_state_; State* state_; bool shutdown_on_destroy_; -#ifndef _WIN32 - std::atomic pid_; -#endif }; // Return the process-global thread pool for CPU-bound tasks. diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index 13093f4a1d576..799675c6fa93b 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -16,7 +16,7 @@ // under the License. #ifndef _WIN32 -#include +#include #include #endif @@ -750,22 +750,7 @@ TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) { #if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \ defined(THREAD_SANITIZER)) -class TestThreadPoolForkSafety : public TestThreadPool { - public: - void CheckChildExit(int child_pid) { - ASSERT_GT(child_pid, 0); - int child_status; - int got_pid = waitpid(child_pid, &child_status, 0); - ASSERT_EQ(got_pid, child_pid); - if (WIFSIGNALED(child_status)) { - FAIL() << "Child terminated by signal " << WTERMSIG(child_status); - } - if (!WIFEXITED(child_status)) { - FAIL() << "Child didn't terminate normally?? Child status = " << child_status; - } - ASSERT_EQ(WEXITSTATUS(child_status), 0); - } -}; +class TestThreadPoolForkSafety : public TestThreadPool {}; TEST_F(TestThreadPoolForkSafety, Basics) { { @@ -784,7 +769,7 @@ TEST_F(TestThreadPoolForkSafety, Basics) { std::exit(st.ok() ? 0 : 2); } else { // Parent - CheckChildExit(child_pid); + AssertChildExit(child_pid); ASSERT_OK(pool->Shutdown()); } } @@ -806,7 +791,7 @@ TEST_F(TestThreadPoolForkSafety, Basics) { std::exit(0); } else { // Parent - CheckChildExit(child_pid); + AssertChildExit(child_pid); } } } @@ -851,7 +836,7 @@ TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) { std::exit(0); } else { // Parent - CheckChildExit(child_pid); + AssertChildExit(child_pid); ASSERT_OK(pool->Shutdown()); } } @@ -879,14 +864,14 @@ TEST_F(TestThreadPoolForkSafety, NestedChild) { ASSERT_OK(pool->Shutdown()); } else { // Child - CheckChildExit(grandchild_pid); + AssertChildExit(grandchild_pid); ASSERT_FINISHES_OK_AND_EQ(7, fut); ASSERT_OK(pool->Shutdown()); } std::exit(0); } else { // Parent - CheckChildExit(child_pid); + AssertChildExit(child_pid); ASSERT_OK(pool->Shutdown()); } }