Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Active Spinning and queue old bthread at the head for bthread mutex #2749

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,14 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
return erased;
}

struct WaitForButexArgs {
ButexBthreadWaiter* bw;
bool prepend;
};

static void wait_for_butex(void* arg) {
ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
auto args = static_cast<WaitForButexArgs*>(arg);
ButexBthreadWaiter* const bw = args->bw;
Butex* const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're queued, otherwise the waiter is already timedout
Expand All @@ -560,7 +566,11 @@ static void wait_for_butex(void* arg) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
!bw->task_meta->interrupted) {
b->waiters.Append(bw);
if (args->prepend) {
b->waiters.Prepend(bw);
} else {
b->waiters.Append(bw);
}
bw->container.store(b, butil::memory_order_relaxed);
if (bw->abstime != NULL) {
bw->sleep_id = get_global_timer_thread()->schedule(
Expand Down Expand Up @@ -593,7 +603,7 @@ static void wait_for_butex(void* arg) {
}

static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
const timespec* abstime) {
const timespec* abstime, bool prepend) {
TaskMeta* task = NULL;
ButexPthreadWaiter pw;
pw.tid = 0;
Expand All @@ -616,7 +626,11 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
errno = EINTR;
rc = -1;
} else {
b->waiters.Append(&pw);
if (prepend) {
b->waiters.Prepend(&pw);
} else {
b->waiters.Append(&pw);
}
pw.container.store(b, butil::memory_order_relaxed);
b->waiter_lock.unlock();

Expand Down Expand Up @@ -646,7 +660,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
return rc;
}

int butex_wait(void* arg, int expected_value, const timespec* abstime) {
int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
Expand All @@ -657,7 +671,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
}
TaskGroup* g = tls_task_group;
if (NULL == g || g->is_current_pthread_task()) {
return butex_wait_from_pthread(g, b, expected_value, abstime);
return butex_wait_from_pthread(g, b, expected_value, abstime, prepend);
}
ButexBthreadWaiter bbw;
// tid is 0 iff the thread is non-bthread
Expand Down Expand Up @@ -690,7 +704,8 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
// release fence matches with acquire fence in interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
g->set_remained(wait_for_butex, &bbw);
WaitForButexArgs args{ &bbw, prepend};
g->set_remained(wait_for_butex, &args);
TaskGroup::sched(&g);

// erase_from_butex_and_wakeup (called by TimerThread) is possibly still
Expand Down
7 changes: 6 additions & 1 deletion src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ int butex_requeue(void* butex1, void* butex2);
// abstime is not NULL.
// About |abstime|:
// Different from FUTEX_WAIT, butex_wait uses absolute time.
// About |prepend|:
// If |prepend| is true, queue the bthread at the head of the queue,
// otherwise at the tail.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* butex, int expected_value, const timespec* abstime);
int butex_wait(void* butex, int expected_value,
const timespec* abstime,
bool prepend = false);

} // namespace bthread

Expand Down
63 changes: 40 additions & 23 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,22 @@
#include "butil/logging.h"
#include "butil/object_pool.h"
#include "butil/debug/stack_trace.h"
#include "butil/thread_local.h"
#include "bthread/butex.h" // butex_*
#include "bthread/mutex.h" // bthread_mutex_t
#include "bthread/sys_futex.h"
#include "bthread/log.h"
#include "butil/debug/stack_trace.h"
#include "bthread/processor.h"
#include "bthread/task_group.h"

extern "C" {
extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void* caller);
}

namespace bthread {

EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);

// Warm up backtrace before main().
const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;

Expand Down Expand Up @@ -772,29 +777,41 @@ const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),
sizeof_mutex_internal_must_equal_unsigned);

inline int mutex_lock_contended(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// a mutex lock should ignore interruptions in general since
// user code is unlikely to check the return value.
return errno;
const int MAX_SPIN_ITER = 4;

inline int mutex_lock_contended_impl(
bthread_mutex_t* m, const struct timespec* __restrict abstime) {
// When a bthread first contends for a lock, active spinning makes sense.
// Spin only few times and only if local `rq' is empty.
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (BAIDU_UNLIKELY(NULL == g || g->rq_size() == 0)) {
for (int i = 0; i < MAX_SPIN_ITER; ++i) {
cpu_relax();
}
}
return 0;
}

inline int mutex_timedlock_contended(
bthread_mutex_t* m, const struct timespec* __restrict abstime) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
bool queue_lifo = false;
bool first_wait = true;
auto whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime) < 0 &&
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime, queue_lifo) < 0 &&
errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// a mutex lock should ignore interrruptions in general since
// A mutex lock should ignore interruptions in general since
// user code is unlikely to check the return value.
return errno;
}
// Ignore EWOULDBLOCK and EINTR.
if (first_wait && 0 == errno) {
first_wait = false;
}
if (!first_wait) {
// Normally, bthreads are queued in FIFO order. But competing with new
// arriving bthreads over the ownership of mutex, a woken up bthread
// has good chances of losing. Because new arriving bthreads are already
// running on CPU and there can be lots of them. In such case, for fairness,
// to avoid starvation, it is queued at the head of the waiter queue.
queue_lifo = true;
}
}
return 0;
}
Expand Down Expand Up @@ -880,7 +897,7 @@ int bthread_mutex_trylock(bthread_mutex_t* m) {
}

int bthread_mutex_lock_contended(bthread_mutex_t* m) {
return bthread::mutex_lock_contended(m);
return bthread::mutex_lock_contended_impl(m, NULL);
}

int bthread_mutex_lock(bthread_mutex_t* m) {
Expand All @@ -890,18 +907,18 @@ int bthread_mutex_lock(bthread_mutex_t* m) {
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
return bthread::mutex_lock_contended(m);
return bthread::mutex_lock_contended_impl(m, NULL);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
return bthread::mutex_lock_contended(m);
return bthread::mutex_lock_contended_impl(m, NULL);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
const int rc = bthread::mutex_lock_contended(m);
const int rc = bthread::mutex_lock_contended_impl(m, NULL);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
Expand All @@ -917,18 +934,18 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
return bthread::mutex_timedlock_contended(m, abstime);
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
return bthread::mutex_timedlock_contended(m, abstime);
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
const int rc = bthread::mutex_timedlock_contended(m, abstime);
const int rc = bthread::mutex_lock_contended_impl(m, abstime);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
Expand Down
5 changes: 5 additions & 0 deletions src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ class TaskGroup {
// process make go on indefinitely.
void push_rq(bthread_t tid);

// Returns size of local run queue.
size_t rq_size() const {
return _rq.volatile_size();
}

bthread_tag_t tag() const { return _tag; }

private:
Expand Down
5 changes: 5 additions & 0 deletions src/butil/containers/linked_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ class LinkedList {
e->InsertBefore(&root_);
}

// Prepend |e| to the head of the linked list.
void Prepend(LinkNode<T>* e) {
e->InsertAfter(&root_);
}

LinkNode<T>* head() const {
return root_.next();
}
Expand Down
18 changes: 15 additions & 3 deletions src/butil/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,43 @@

#define BAIDU_VOLATILE_THREAD_LOCAL(type, var_name, default_value) \
BAIDU_THREAD_LOCAL type var_name = default_value; \
static __attribute__((noinline, unused)) type get_##var_name(void) { \
__attribute__((noinline, unused)) type get_##var_name(void) { \
asm volatile(""); \
return var_name; \
} \
static __attribute__((noinline, unused)) type *get_ptr_##var_name(void) { \
__attribute__((noinline, unused)) type *get_ptr_##var_name(void) { \
type *ptr = &var_name; \
asm volatile("" : "+rm"(ptr)); \
return ptr; \
} \
static __attribute__((noinline, unused)) void set_##var_name(type v) { \
__attribute__((noinline, unused)) void set_##var_name(type v) { \
asm volatile(""); \
var_name = v; \
}

#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \
type get_##var_name(void); \
type *get_ptr_##var_name(void); \
void set_##var_name(type v)

#if (defined (__aarch64__) && defined (__GNUC__)) || defined(__clang__)
// GNU compiler under aarch and Clang compiler is incorrectly caching the
// address of thread_local variables across a suspend-point. The following
// macros used to disable the volatile thread local access optimization.
#define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) get_##var_name()
#define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) get_ptr_##var_name()
#define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) set_##var_name(value)

#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \
type get_##var_name(void); \
type *get_ptr_##var_name(void); \
void set_##var_name(type v)
#else
#define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) var_name
#define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) &##var_name
#define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) var_name = value
#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \
extern BAIDU_THREAD_LOCAL type var_name
#endif

namespace butil {
Expand Down
Loading