Skip to content

Commit

Permalink
Fix butex wait_pthread handle EINTR (apache#2086)
Browse files Browse the repository at this point in the history
  • Loading branch information
jenrryyou authored and Yang Liming committed Oct 31, 2023
1 parent c3252d2 commit 6372b18
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 30 deletions.
59 changes: 29 additions & 30 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,27 +137,41 @@ static void wakeup_pthread(ButexPthreadWaiter* pw) {

bool erase_from_butex(ButexWaiter*, bool, WaiterState);

int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
int wait_pthread(ButexPthreadWaiter& pw, const timespec* abstime) {
timespec* ptimeout = NULL;
timespec timeout;
int64_t timeout_us = 0;
int rc;

while (true) {
const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
// If `sig' is changed, wakeup_pthread() must be called and `pw'
// is already removed from the butex.
// Acquire fence makes this thread sees changes before wakeup.
return rc;
if (abstime != NULL) {
timeout_us = butil::timespec_to_microseconds(*abstime) - butil::gettimeofday_us();
timeout = butil::microseconds_to_timespec(timeout_us);
ptimeout = &timeout;
}
if (timeout_us > MIN_SLEEP_US || abstime == NULL) {
rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
// If `sig' is changed, wakeup_pthread() must be called and `pw'
// is already removed from the butex.
// Acquire fence makes this thread sees changes before wakeup.
return rc;
}
} else {
errno = ETIMEDOUT;
rc = -1;
}
// Handle ETIMEDOUT when abstime is valid.
// If futex_wait_private return EINTR, just continue the loop.
if (rc != 0 && errno == ETIMEDOUT) {
// Note that we don't handle the EINTR from futex_wait here since
// pthreads waiting on a butex should behave similarly as bthreads
// which are not able to be woken-up by signals.
// EINTR on butex is only producible by TaskGroup::interrupt().

// `pw' is still in the queue, remove it.
// wait futex timeout, `pw' is still in the queue, remove it.
if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
// Another thread is erasing `pw' as well, wait for the signal.
// Acquire fence makes this thread sees changes before wakeup.
if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) {
ptimeout = NULL; // already timedout, ptimeout is expired.
// already timedout, abstime and ptimeout are expired.
abstime = NULL;
ptimeout = NULL;
continue;
}
}
Expand Down Expand Up @@ -567,21 +581,6 @@ static void wait_for_butex(void* arg) {

static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
const timespec* abstime) {
// sys futex needs relative timeout.
// Compute diff between abstime and now.
timespec* ptimeout = NULL;
timespec timeout;
if (abstime != NULL) {
const int64_t timeout_us = butil::timespec_to_microseconds(*abstime) -
butil::gettimeofday_us();
if (timeout_us < MIN_SLEEP_US) {
errno = ETIMEDOUT;
return -1;
}
timeout = butil::microseconds_to_timespec(timeout_us);
ptimeout = &timeout;
}

TaskMeta* task = NULL;
ButexPthreadWaiter pw;
pw.tid = 0;
Expand Down Expand Up @@ -612,7 +611,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
num_waiters << 1;
#endif
rc = wait_pthread(pw, ptimeout);
rc = wait_pthread(pw, abstime);
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
num_waiters << -1;
#endif
Expand Down
49 changes: 49 additions & 0 deletions test/bthread_butex_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "bthread/task_group.h"
#include "bthread/bthread.h"
#include "bthread/unstable.h"
#include "bthread/interrupt_pthread.h"

namespace bthread {
extern butil::atomic<TaskControl*> g_task_control;
Expand Down Expand Up @@ -394,4 +395,52 @@ TEST(ButexTest, stop_before_sleeping) {
ASSERT_EQ(EINVAL, bthread_stop(th));
}
}

void* trigger_signal(void* arg) {
pthread_t * th = (pthread_t*)arg;
const long t1 = butil::gettimeofday_us();
for (size_t i = 0; i < 50; ++i) {
usleep(100000);
if (bthread::interrupt_pthread(*th) == ESRCH) {
LOG(INFO) << "waiter thread end, trigger count=" << i;
break;
}
}
const long t2 = butil::gettimeofday_us();
LOG(INFO) << "trigger signal thread end, elapsed=" << (t2-t1) << "us";
return NULL;
}

TEST(ButexTest, wait_with_signal_triggered) {
butil::Timer tm;

const int64_t WAIT_MSEC = 500;
WaiterArg waiter_args;
pthread_t waiter_th, tigger_th;
butil::atomic<int>* butex =
bthread::butex_create_checked<butil::atomic<int> >();
ASSERT_TRUE(butex);
*butex = 1;
ASSERT_EQ(0, bthread::butex_wake(butex));

const timespec abstime = butil::milliseconds_from_now(WAIT_MSEC);
waiter_args.expected_value = *butex;
waiter_args.butex = butex;
waiter_args.expected_result = ETIMEDOUT;
waiter_args.ptimeout = &abstime;
tm.start();
pthread_create(&waiter_th, NULL, waiter, &waiter_args);
pthread_create(&tigger_th, NULL, trigger_signal, &waiter_th);

ASSERT_EQ(0, pthread_join(waiter_th, NULL));
tm.stop();
auto wait_elapsed_ms = tm.m_elapsed();;
LOG(INFO) << "waiter thread end, elapsed " << wait_elapsed_ms << " ms";

ASSERT_LT(labs(wait_elapsed_ms - WAIT_MSEC), 250);

ASSERT_EQ(0, pthread_join(tigger_th, NULL));
bthread::butex_destroy(butex);
}

} // namespace

0 comments on commit 6372b18

Please sign in to comment.