diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp index 2b7c78b8c4..b603d89c11 100644 --- a/src/bthread/butex.cpp +++ b/src/bthread/butex.cpp @@ -537,8 +537,14 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) { return erased; } +struct WaitForButexArgs { + ButexBthreadWaiter* bw; + bool prepend; +}; + static void wait_for_butex(void* arg) { - ButexBthreadWaiter* const bw = static_cast(arg); + auto args = static_cast(arg); + ButexBthreadWaiter* const bw = args->bw; Butex* const b = bw->initial_butex; // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY // before they're queued, otherwise the waiter is already timedout @@ -560,7 +566,11 @@ static void wait_for_butex(void* arg) { bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE; } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ && !bw->task_meta->interrupted) { - b->waiters.Append(bw); + if (args->prepend) { + b->waiters.Prepend(bw); + } else { + b->waiters.Append(bw); + } bw->container.store(b, butil::memory_order_relaxed); if (bw->abstime != NULL) { bw->sleep_id = get_global_timer_thread()->schedule( @@ -593,7 +603,7 @@ static void wait_for_butex(void* arg) { } static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, - const timespec* abstime) { + const timespec* abstime, bool prepend) { TaskMeta* task = NULL; ButexPthreadWaiter pw; pw.tid = 0; @@ -616,7 +626,11 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, errno = EINTR; rc = -1; } else { - b->waiters.Append(&pw); + if (prepend) { + b->waiters.Prepend(&pw); + } else { + b->waiters.Append(&pw); + } pw.container.store(b, butil::memory_order_relaxed); b->waiter_lock.unlock(); @@ -646,7 +660,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, return rc; } -int butex_wait(void* arg, int expected_value, const timespec* abstime) { +int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) { Butex* b = container_of(static_cast*>(arg), Butex, value); if (b->value.load(butil::memory_order_relaxed) != expected_value) { errno = EWOULDBLOCK; @@ -657,7 +671,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { } TaskGroup* g = tls_task_group; if (NULL == g || g->is_current_pthread_task()) { - return butex_wait_from_pthread(g, b, expected_value, abstime); + return butex_wait_from_pthread(g, b, expected_value, abstime, prepend); } ButexBthreadWaiter bbw; // tid is 0 iff the thread is non-bthread @@ -690,7 +704,8 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { // release fence matches with acquire fence in interrupt_and_consume_waiters // in task_group.cpp to guarantee visibility of `interrupted'. bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release); - g->set_remained(wait_for_butex, &bbw); + WaitForButexArgs args{ &bbw, prepend}; + g->set_remained(wait_for_butex, &args); TaskGroup::sched(&g); // erase_from_butex_and_wakeup (called by TimerThread) is possibly still diff --git a/src/bthread/butex.h b/src/bthread/butex.h index 93a1f6ece7..b40ec1e04b 100644 --- a/src/bthread/butex.h +++ b/src/bthread/butex.h @@ -67,8 +67,13 @@ int butex_requeue(void* butex1, void* butex2); // abstime is not NULL. // About |abstime|: // Different from FUTEX_WAIT, butex_wait uses absolute time. +// About |prepend|: +// If |prepend| is true, queue the bthread at the head of the queue, +// otherwise at the tail. // Returns 0 on success, -1 otherwise and errno is set. -int butex_wait(void* butex, int expected_value, const timespec* abstime); +int butex_wait(void* butex, int expected_value, + const timespec* abstime, + bool prepend = false); } // namespace bthread diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp index f2606bb9e1..403f6bb8a8 100644 --- a/src/bthread/mutex.cpp +++ b/src/bthread/mutex.cpp @@ -39,17 +39,22 @@ #include "butil/logging.h" #include "butil/object_pool.h" #include "butil/debug/stack_trace.h" +#include "butil/thread_local.h" #include "bthread/butex.h" // butex_* #include "bthread/mutex.h" // bthread_mutex_t #include "bthread/sys_futex.h" #include "bthread/log.h" -#include "butil/debug/stack_trace.h" +#include "bthread/processor.h" +#include "bthread/task_group.h" extern "C" { extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void* caller); } namespace bthread { + +EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group); + // Warm up backtrace before main(). const butil::debug::StackTrace ALLOW_UNUSED dummy_bt; @@ -772,29 +777,41 @@ const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0}; BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal), sizeof_mutex_internal_must_equal_unsigned); -inline int mutex_lock_contended(bthread_mutex_t* m) { - butil::atomic* whole = (butil::atomic*)m->butex; - while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) { - if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 && - errno != EWOULDBLOCK && errno != EINTR/*note*/) { - // a mutex lock should ignore interruptions in general since - // user code is unlikely to check the return value. - return errno; +const int MAX_SPIN_ITER = 4; + +inline int mutex_lock_contended_impl( + bthread_mutex_t* m, const struct timespec* __restrict abstime) { + // When a bthread first contends for a lock, active spinning makes sense. + // Spin only few times and only if local `rq' is empty. + TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); + if (BAIDU_UNLIKELY(NULL == g || g->rq_size() == 0)) { + for (int i = 0; i < MAX_SPIN_ITER; ++i) { + cpu_relax(); } } - return 0; -} -inline int mutex_timedlock_contended( - bthread_mutex_t* m, const struct timespec* __restrict abstime) { - butil::atomic* whole = (butil::atomic*)m->butex; + bool queue_lifo = false; + bool first_wait = true; + auto whole = (butil::atomic*)m->butex; while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) { - if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime) < 0 && + if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime, queue_lifo) < 0 && errno != EWOULDBLOCK && errno != EINTR/*note*/) { - // a mutex lock should ignore interrruptions in general since + // A mutex lock should ignore interruptions in general since // user code is unlikely to check the return value. return errno; } + // Ignore EWOULDBLOCK and EINTR. + if (first_wait && 0 == errno) { + first_wait = false; + } + if (!first_wait) { + // Normally, bthreads are queued in FIFO order. But competing with new + // arriving bthreads over the ownership of mutex, a woken up bthread + // has good chances of losing. Because new arriving bthreads are already + // running on CPU and there can be lots of them. In such case, for fairness, + // to avoid starvation, it is queued at the head of the waiter queue. + queue_lifo = true; + } } return 0; } @@ -880,7 +897,7 @@ int bthread_mutex_trylock(bthread_mutex_t* m) { } int bthread_mutex_lock_contended(bthread_mutex_t* m) { - return bthread::mutex_lock_contended(m); + return bthread::mutex_lock_contended_impl(m, NULL); } int bthread_mutex_lock(bthread_mutex_t* m) { @@ -890,18 +907,18 @@ int bthread_mutex_lock(bthread_mutex_t* m) { } // Don't sample when contention profiler is off. if (!bthread::g_cp) { - return bthread::mutex_lock_contended(m); + return bthread::mutex_lock_contended_impl(m, NULL); } // Ask Collector if this (contended) locking should be sampled. const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); if (!sampling_range) { // Don't sample - return bthread::mutex_lock_contended(m); + return bthread::mutex_lock_contended_impl(m, NULL); } // Start sampling. const int64_t start_ns = butil::cpuwide_time_ns(); // NOTE: Don't modify m->csite outside lock since multiple threads are // still contending with each other. - const int rc = bthread::mutex_lock_contended(m); + const int rc = bthread::mutex_lock_contended_impl(m, NULL); if (!rc) { // Inside lock m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns; m->csite.sampling_range = sampling_range; @@ -917,18 +934,18 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m, } // Don't sample when contention profiler is off. if (!bthread::g_cp) { - return bthread::mutex_timedlock_contended(m, abstime); + return bthread::mutex_lock_contended_impl(m, abstime); } // Ask Collector if this (contended) locking should be sampled. const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); if (!sampling_range) { // Don't sample - return bthread::mutex_timedlock_contended(m, abstime); + return bthread::mutex_lock_contended_impl(m, abstime); } // Start sampling. const int64_t start_ns = butil::cpuwide_time_ns(); // NOTE: Don't modify m->csite outside lock since multiple threads are // still contending with each other. - const int rc = bthread::mutex_timedlock_contended(m, abstime); + const int rc = bthread::mutex_lock_contended_impl(m, abstime); if (!rc) { // Inside lock m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns; m->csite.sampling_range = sampling_range; diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index b71994a0f7..a19bd023f7 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -182,6 +182,11 @@ class TaskGroup { // process make go on indefinitely. void push_rq(bthread_t tid); + // Returns size of local run queue. + size_t rq_size() const { + return _rq.volatile_size(); + } + bthread_tag_t tag() const { return _tag; } private: diff --git a/src/butil/containers/linked_list.h b/src/butil/containers/linked_list.h index 7130c0469c..7874b65aaf 100644 --- a/src/butil/containers/linked_list.h +++ b/src/butil/containers/linked_list.h @@ -171,6 +171,11 @@ class LinkedList { e->InsertBefore(&root_); } + // Prepend |e| to the head of the linked list. + void Prepend(LinkNode* e) { + e->InsertAfter(&root_); + } + LinkNode* head() const { return root_.next(); } diff --git a/src/butil/thread_local.h b/src/butil/thread_local.h index a3cb1ff087..a67327c633 100644 --- a/src/butil/thread_local.h +++ b/src/butil/thread_local.h @@ -32,20 +32,25 @@ #define BAIDU_VOLATILE_THREAD_LOCAL(type, var_name, default_value) \ BAIDU_THREAD_LOCAL type var_name = default_value; \ - static __attribute__((noinline, unused)) type get_##var_name(void) { \ + __attribute__((noinline, unused)) type get_##var_name(void) { \ asm volatile(""); \ return var_name; \ } \ - static __attribute__((noinline, unused)) type *get_ptr_##var_name(void) { \ + __attribute__((noinline, unused)) type *get_ptr_##var_name(void) { \ type *ptr = &var_name; \ asm volatile("" : "+rm"(ptr)); \ return ptr; \ } \ - static __attribute__((noinline, unused)) void set_##var_name(type v) { \ + __attribute__((noinline, unused)) void set_##var_name(type v) { \ asm volatile(""); \ var_name = v; \ } +#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \ + type get_##var_name(void); \ + type *get_ptr_##var_name(void); \ + void set_##var_name(type v) + #if (defined (__aarch64__) && defined (__GNUC__)) || defined(__clang__) // GNU compiler under aarch and Clang compiler is incorrectly caching the // address of thread_local variables across a suspend-point. The following @@ -53,10 +58,17 @@ #define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) get_##var_name() #define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) get_ptr_##var_name() #define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) set_##var_name(value) + +#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \ + type get_##var_name(void); \ + type *get_ptr_##var_name(void); \ + void set_##var_name(type v) #else #define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) var_name #define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) &##var_name #define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) var_name = value +#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \ + extern BAIDU_THREAD_LOCAL type var_name #endif namespace butil {