From 6372b180f713d2d797d0483ab3fd67078210223f Mon Sep 17 00:00:00 2001 From: Jenrry You Date: Fri, 5 May 2023 14:47:19 +0800 Subject: [PATCH] Fix butex wait_pthread handle EINTR (#2086) --- src/bthread/butex.cpp | 59 ++++++++++++++++----------------- test/bthread_butex_unittest.cpp | 49 +++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 30 deletions(-) diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp index 757b314320..19b03725f3 100644 --- a/src/bthread/butex.cpp +++ b/src/bthread/butex.cpp @@ -137,27 +137,41 @@ static void wakeup_pthread(ButexPthreadWaiter* pw) { bool erase_from_butex(ButexWaiter*, bool, WaiterState); -int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) { +int wait_pthread(ButexPthreadWaiter& pw, const timespec* abstime) { + timespec* ptimeout = NULL; + timespec timeout; + int64_t timeout_us = 0; + int rc; + while (true) { - const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout); - if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) { - // If `sig' is changed, wakeup_pthread() must be called and `pw' - // is already removed from the butex. - // Acquire fence makes this thread sees changes before wakeup. - return rc; + if (abstime != NULL) { + timeout_us = butil::timespec_to_microseconds(*abstime) - butil::gettimeofday_us(); + timeout = butil::microseconds_to_timespec(timeout_us); + ptimeout = &timeout; + } + if (timeout_us > MIN_SLEEP_US || abstime == NULL) { + rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout); + if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) { + // If `sig' is changed, wakeup_pthread() must be called and `pw' + // is already removed from the butex. + // Acquire fence makes this thread sees changes before wakeup. + return rc; + } + } else { + errno = ETIMEDOUT; + rc = -1; } + // Handle ETIMEDOUT when abstime is valid. + // If futex_wait_private return EINTR, just continue the loop. if (rc != 0 && errno == ETIMEDOUT) { - // Note that we don't handle the EINTR from futex_wait here since - // pthreads waiting on a butex should behave similarly as bthreads - // which are not able to be woken-up by signals. - // EINTR on butex is only producible by TaskGroup::interrupt(). - - // `pw' is still in the queue, remove it. + // wait futex timeout, `pw' is still in the queue, remove it. if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) { // Another thread is erasing `pw' as well, wait for the signal. // Acquire fence makes this thread sees changes before wakeup. if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) { - ptimeout = NULL; // already timedout, ptimeout is expired. + // already timedout, abstime and ptimeout are expired. + abstime = NULL; + ptimeout = NULL; continue; } } @@ -567,21 +581,6 @@ static void wait_for_butex(void* arg) { static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, const timespec* abstime) { - // sys futex needs relative timeout. - // Compute diff between abstime and now. - timespec* ptimeout = NULL; - timespec timeout; - if (abstime != NULL) { - const int64_t timeout_us = butil::timespec_to_microseconds(*abstime) - - butil::gettimeofday_us(); - if (timeout_us < MIN_SLEEP_US) { - errno = ETIMEDOUT; - return -1; - } - timeout = butil::microseconds_to_timespec(timeout_us); - ptimeout = &timeout; - } - TaskMeta* task = NULL; ButexPthreadWaiter pw; pw.tid = 0; @@ -612,7 +611,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, bvar::Adder& num_waiters = butex_waiter_count(); num_waiters << 1; #endif - rc = wait_pthread(pw, ptimeout); + rc = wait_pthread(pw, abstime); #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS num_waiters << -1; #endif diff --git a/test/bthread_butex_unittest.cpp b/test/bthread_butex_unittest.cpp index eb991fd03f..8f2f4f5f3f 100644 --- a/test/bthread_butex_unittest.cpp +++ b/test/bthread_butex_unittest.cpp @@ -25,6 +25,7 @@ #include "bthread/task_group.h" #include "bthread/bthread.h" #include "bthread/unstable.h" +#include "bthread/interrupt_pthread.h" namespace bthread { extern butil::atomic g_task_control; @@ -394,4 +395,52 @@ TEST(ButexTest, stop_before_sleeping) { ASSERT_EQ(EINVAL, bthread_stop(th)); } } + +void* trigger_signal(void* arg) { + pthread_t * th = (pthread_t*)arg; + const long t1 = butil::gettimeofday_us(); + for (size_t i = 0; i < 50; ++i) { + usleep(100000); + if (bthread::interrupt_pthread(*th) == ESRCH) { + LOG(INFO) << "waiter thread end, trigger count=" << i; + break; + } + } + const long t2 = butil::gettimeofday_us(); + LOG(INFO) << "trigger signal thread end, elapsed=" << (t2-t1) << "us"; + return NULL; +} + +TEST(ButexTest, wait_with_signal_triggered) { + butil::Timer tm; + + const int64_t WAIT_MSEC = 500; + WaiterArg waiter_args; + pthread_t waiter_th, tigger_th; + butil::atomic* butex = + bthread::butex_create_checked >(); + ASSERT_TRUE(butex); + *butex = 1; + ASSERT_EQ(0, bthread::butex_wake(butex)); + + const timespec abstime = butil::milliseconds_from_now(WAIT_MSEC); + waiter_args.expected_value = *butex; + waiter_args.butex = butex; + waiter_args.expected_result = ETIMEDOUT; + waiter_args.ptimeout = &abstime; + tm.start(); + pthread_create(&waiter_th, NULL, waiter, &waiter_args); + pthread_create(&tigger_th, NULL, trigger_signal, &waiter_th); + + ASSERT_EQ(0, pthread_join(waiter_th, NULL)); + tm.stop(); + auto wait_elapsed_ms = tm.m_elapsed();; + LOG(INFO) << "waiter thread end, elapsed " << wait_elapsed_ms << " ms"; + + ASSERT_LT(labs(wait_elapsed_ms - WAIT_MSEC), 250); + + ASSERT_EQ(0, pthread_join(tigger_th, NULL)); + bthread::butex_destroy(butex); +} + } // namespace