Skip to content

Commit

Permalink
Add fork safety tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Oct 11, 2022
1 parent 4b5f4fe commit 7913919
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 25 deletions.
16 changes: 16 additions & 0 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,22 @@ bool FileIsClosed(int fd) {
#endif
}

#if !defined(_WIN32)
void AssertChildExit(int child_pid, int expected_exit_status) {
ASSERT_GT(child_pid, 0);
int child_status;
int got_pid = waitpid(child_pid, &child_status, 0);
ASSERT_EQ(got_pid, child_pid);
if (WIFSIGNALED(child_status)) {
FAIL() << "Child terminated by signal " << WTERMSIG(child_status);
}
if (!WIFEXITED(child_status)) {
FAIL() << "Child didn't terminate normally?? Child status = " << child_status;
}
ASSERT_EQ(WEXITSTATUS(child_status), expected_exit_status);
}
#endif

bool LocaleExists(const char* locale) {
try {
std::locale loc(locale);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ std::vector<T> IteratorToVector(Iterator<T> iterator) {
ARROW_TESTING_EXPORT
bool LocaleExists(const char* locale);

#ifndef _WIN32
ARROW_TESTING_EXPORT
void AssertChildExit(int child_pid, int expected_exit_status = 0);
#endif

// A RAII-style object that switches to a new locale, and switches back
// to the old locale when going out of scope. Doesn't do anything if the
// new locale doesn't exist on the local machine.
Expand Down
15 changes: 12 additions & 3 deletions cpp/src/arrow/util/cancel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,15 @@ struct SignalStopState {
stop_source_ = NullSource();
}

static const std::shared_ptr<SignalStopState>& instance() {
static SignalStopState* instance(bool signal_safe = false) {
static auto instance = std::make_shared<SignalStopState>();
#ifndef _WIN32
if (instance->InForkedChild()) {
// In child process
if (signal_safe) {
return nullptr;
}
// Not called from a signal => can reinitialize
auto lock = arrow::util::GlobalForkSafeMutex()->Lock();
if (instance->pid_.load() != getpid()) {
bool thread_was_launched{instance->signal_receiving_thread_};
Expand All @@ -208,7 +212,7 @@ struct SignalStopState {
}
}
#endif
return instance;
return instance.get();
}

private:
Expand All @@ -219,7 +223,12 @@ struct SignalStopState {
signal_receiving_thread_ = std::make_unique<std::thread>(ReceiveSignals, self_pipe_);
}

static void HandleSignal(int signum) { instance()->DoHandleSignal(signum); }
static void HandleSignal(int signum) {
auto self = instance(/*signal_safe=*/true);
if (self) {
self->DoHandleSignal(signum);
}
}

void DoHandleSignal(int signum) {
// async-signal-safe code only
Expand Down
24 changes: 24 additions & 0 deletions cpp/src/arrow/util/cancel_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include <signal.h>
#ifndef _WIN32
#include <sys/time.h> // for setitimer()
#include <sys/types.h>
#include <unistd.h>
#endif

#include "arrow/testing/gtest_util.h"
Expand Down Expand Up @@ -238,6 +240,28 @@ TEST_F(SignalCancelTest, RegisterUnregister) {
AssertStopRequested();
}

#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER))
TEST_F(SignalCancelTest, ForkSafety) {
RegisterHandler();

auto child_pid = fork();
if (child_pid == 0) {
// Child: trigger signal
TriggerSignal();
// Stop source destruction should neither crash not affect parent
std::exit(0);
} else {
// Parent: shouldn't notice signal raised in child
AssertChildExit(child_pid);
AssertStopNotRequested();

// Stop source still usable in parent
TriggerSignal();
AssertStopRequested();
}
}
#endif

TEST_F(CancelTest, ThreadedPollSuccess) {
constexpr int kNumThreads = 10;

Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/util/io_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#ifndef _WIN32
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#endif

Expand Down Expand Up @@ -446,6 +447,33 @@ TEST_F(TestSelfPipe, SendFromSignalAndWait) {
ASSERT_OK(ReadStatus());
}

#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER))
TEST_F(TestSelfPipe, ForkSafety) {
// Self-pipe isn't usable from child, but should neither crash at exit
// nor disrupt parent
self_pipe_->Send(123456789123456789ULL);

auto child_pid = fork();
if (child_pid == 0) {
// Child: pipe is unusable
self_pipe_->Send(41ULL);
self_pipe_->Send(42ULL);
ASSERT_RAISES(Invalid, self_pipe_->Wait());
self_pipe_.reset();
std::exit(0);
} else {
// Parent: pipe is still usable, data is read correctly
AssertChildExit(child_pid);
StartReading();
SleepABit();
self_pipe_->Send(987654321987654321ULL);

AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL});
ASSERT_OK(ReadStatus());
}
}
#endif

TEST(PlatformFilename, RoundtripAscii) {
PlatformFilename fn;
ASSERT_OK_AND_ASSIGN(fn, PlatformFilename::FromString("a/b"));
Expand Down
29 changes: 7 additions & 22 deletions cpp/src/arrow/util/thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

#ifndef _WIN32
#include <sys/wait.h>
#include <sys/types.h>
#include <unistd.h>
#endif

Expand Down Expand Up @@ -749,22 +749,7 @@ TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) {
#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
defined(THREAD_SANITIZER))

class TestThreadPoolForkSafety : public TestThreadPool {
public:
void CheckChildExit(int child_pid) {
ASSERT_GT(child_pid, 0);
int child_status;
int got_pid = waitpid(child_pid, &child_status, 0);
ASSERT_EQ(got_pid, child_pid);
if (WIFSIGNALED(child_status)) {
FAIL() << "Child terminated by signal " << WTERMSIG(child_status);
}
if (!WIFEXITED(child_status)) {
FAIL() << "Child didn't terminate normally?? Child status = " << child_status;
}
ASSERT_EQ(WEXITSTATUS(child_status), 0);
}
};
class TestThreadPoolForkSafety : public TestThreadPool {};

TEST_F(TestThreadPoolForkSafety, Basics) {
{
Expand All @@ -783,7 +768,7 @@ TEST_F(TestThreadPoolForkSafety, Basics) {
std::exit(st.ok() ? 0 : 2);
} else {
// Parent
CheckChildExit(child_pid);
AssertChildExit(child_pid);
ASSERT_OK(pool->Shutdown());
}
}
Expand All @@ -805,7 +790,7 @@ TEST_F(TestThreadPoolForkSafety, Basics) {
std::exit(0);
} else {
// Parent
CheckChildExit(child_pid);
AssertChildExit(child_pid);
}
}
}
Expand Down Expand Up @@ -850,7 +835,7 @@ TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) {
std::exit(0);
} else {
// Parent
CheckChildExit(child_pid);
AssertChildExit(child_pid);
ASSERT_OK(pool->Shutdown());
}
}
Expand Down Expand Up @@ -878,14 +863,14 @@ TEST_F(TestThreadPoolForkSafety, NestedChild) {
ASSERT_OK(pool->Shutdown());
} else {
// Child
CheckChildExit(grandchild_pid);
AssertChildExit(grandchild_pid);
ASSERT_FINISHES_OK_AND_EQ(7, fut);
ASSERT_OK(pool->Shutdown());
}
std::exit(0);
} else {
// Parent
CheckChildExit(child_pid);
AssertChildExit(child_pid);
ASSERT_OK(pool->Shutdown());
}
}
Expand Down

0 comments on commit 7913919

Please sign in to comment.