Skip to content

Commit

Permalink
Support bthread rwlock
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Sep 1, 2024
1 parent 48996cf commit a4e41fc
Show file tree
Hide file tree
Showing 12 changed files with 1,026 additions and 67 deletions.
18 changes: 11 additions & 7 deletions src/bthread/bthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ extern int bthread_usleep(uint64_t microseconds);
// NOTE: mutexattr is not used in current mutex implementation. User shall
// always pass a NULL attribute.
extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
const bthread_mutexattr_t* __restrict mutex_attr);
const bthread_mutexattr_t* __restrict attr);

// Destroy `mutex'.
extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
Expand All @@ -188,6 +188,12 @@ extern int bthread_mutex_timedlock(bthread_mutex_t* __restrict mutex,
// Unlock `mutex'.
extern int bthread_mutex_unlock(bthread_mutex_t* mutex);

extern int bthread_mutexattr_init(bthread_mutexattr_t* attr);

extern int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr);

extern int bthread_mutexattr_destroy(bthread_mutexattr_t* attr);

// -----------------------------------------------
// Functions for handling conditional variables.
// -----------------------------------------------
Expand Down Expand Up @@ -241,9 +247,8 @@ extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock);
extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock);

// Try to acquire read lock for `rwlock' or return after specfied time.
extern int bthread_rwlock_timedrdlock(
bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);
extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);

// Acquire write lock for `rwlock'.
extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
Expand All @@ -252,9 +257,8 @@ extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock);

// Try to acquire write lock for `rwlock' or return after specfied time.
extern int bthread_rwlock_timedwrlock(
bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);
extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);

// Unlock `rwlock'.
extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock);
Expand Down
37 changes: 28 additions & 9 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,14 @@ int butex_wake(void* arg, bool nosignal) {
return 1;
}

int butex_wake_all(void* arg, bool nosignal) {
int butex_wake_n(void* arg, size_t num, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

ButexWaiterList bthread_waiters;
ButexWaiterList pthread_waiters;
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
while (!b->waiters.empty()) {
for (size_t i = 0; (num == 0 || i < num) && !b->waiters.empty(); ++i) {
ButexWaiter* bw = b->waiters.head()->value();
bw->RemoveFromList();
bw->container.store(NULL, butil::memory_order_relaxed);
Expand Down Expand Up @@ -393,6 +393,10 @@ int butex_wake_all(void* arg, bool nosignal) {
return nwakeup;
}

int butex_wake_all(void* arg, bool nosignal) {
return butex_wake_n(arg, 0, nosignal);
}

int butex_wake_except(void* arg, bthread_t excluded_bthread) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

Expand Down Expand Up @@ -537,8 +541,14 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
return erased;
}

struct WaitForButexArgs {
ButexBthreadWaiter* bw;
bool prepend;
};

static void wait_for_butex(void* arg) {
ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
auto args = static_cast<WaitForButexArgs*>(arg);
ButexBthreadWaiter* const bw = args->bw;
Butex* const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're queued, otherwise the waiter is already timedout
Expand All @@ -560,7 +570,11 @@ static void wait_for_butex(void* arg) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
!bw->task_meta->interrupted) {
b->waiters.Append(bw);
if (args->prepend) {
b->waiters.Prepend(bw);
} else {
b->waiters.Append(bw);
}
bw->container.store(b, butil::memory_order_relaxed);
if (bw->abstime != NULL) {
bw->sleep_id = get_global_timer_thread()->schedule(
Expand Down Expand Up @@ -593,7 +607,7 @@ static void wait_for_butex(void* arg) {
}

static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
const timespec* abstime) {
const timespec* abstime, bool prepend) {
TaskMeta* task = NULL;
ButexPthreadWaiter pw;
pw.tid = 0;
Expand All @@ -616,7 +630,11 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
errno = EINTR;
rc = -1;
} else {
b->waiters.Append(&pw);
if (prepend) {
b->waiters.Prepend(&pw);
} else {
b->waiters.Append(&pw);
}
pw.container.store(b, butil::memory_order_relaxed);
b->waiter_lock.unlock();

Expand Down Expand Up @@ -646,7 +664,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
return rc;
}

int butex_wait(void* arg, int expected_value, const timespec* abstime) {
int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
Expand All @@ -657,7 +675,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
}
TaskGroup* g = tls_task_group;
if (NULL == g || g->is_current_pthread_task()) {
return butex_wait_from_pthread(g, b, expected_value, abstime);
return butex_wait_from_pthread(g, b, expected_value, abstime, prepend);
}
ButexBthreadWaiter bbw;
// tid is 0 iff the thread is non-bthread
Expand Down Expand Up @@ -690,7 +708,8 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
// release fence matches with acquire fence in interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
g->set_remained(wait_for_butex, &bbw);
WaitForButexArgs args{ &bbw, prepend};
g->set_remained(wait_for_butex, &args);
TaskGroup::sched(&g);

// erase_from_butex_and_wakeup (called by TimerThread) is possibly still
Expand Down
12 changes: 11 additions & 1 deletion src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void butex_destroy(void* butex);
// Returns # of threads woken up.
int butex_wake(void* butex, bool nosignal = false);

// Wake up all threads waiting on |butex| if n is zero,
// Otherwise, wake up at most n thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_n(void* butex, size_t num, bool nosignal = false);

// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_all(void* butex, bool nosignal = false);
Expand All @@ -67,8 +72,13 @@ int butex_requeue(void* butex1, void* butex2);
// abstime is not NULL.
// About |abstime|:
// Different from FUTEX_WAIT, butex_wait uses absolute time.
// About |prepend|:
// If |prepend| is true, queue the bthread at the head of the queue,
// otherwise at the tail.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* butex, int expected_value, const timespec* abstime);
int butex_wait(void* butex, int expected_value,
const timespec* abstime,
bool prepend = false);

} // namespace bthread

Expand Down
85 changes: 44 additions & 41 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace bthread {
const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;

// For controlling contentions collected per second.
static bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;
bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;

const size_t MAX_CACHED_CONTENTIONS = 512;
// Skip frames which are always same: the unlock function and submit_contention()
Expand Down Expand Up @@ -262,7 +262,7 @@ void ContentionProfiler::flush_to_disk(bool ending) {

// If contention profiler is on, this variable will be set with a valid
// instance. NULL otherwise.
BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp = NULL;
BAIDU_CACHELINE_ALIGNMENT ContentionProfiler* g_cp = NULL;
// Need this version to solve an issue that non-empty entries left by
// previous contention profilers should be detected and overwritten.
static uint64_t g_cp_version = 0;
Expand Down Expand Up @@ -364,13 +364,15 @@ void ContentionProfilerStop() {
LOG(ERROR) << "Contention profiler is not started!";
}

BUTIL_FORCE_INLINE bool
is_contention_site_valid(const bthread_contention_site_t& cs) {
bool is_contention_site_valid(const bthread_contention_site_t& cs) {
return cs.sampling_range;
}

BUTIL_FORCE_INLINE void
make_contention_site_invalid(bthread_contention_site_t* cs) {
bool is_sampling_range_valid(size_t sampling_range) {
return sampling_range > 0;
}

void make_contention_site_invalid(bthread_contention_site_t* cs) {
cs->sampling_range = 0;
}

Expand Down Expand Up @@ -666,13 +668,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
MutexAndContentionSite& entry = fast_alt.list[fast_alt.count++];
entry.mutex = mutex;
csite = &entry.csite;
if (!sampling_range) {
if (!is_sampling_range_valid(sampling_range)) {
make_contention_site_invalid(&entry.csite);
return pthread_mutex_lock_internal(mutex);
}
}
#endif
if (!sampling_range) { // don't sample
if (!is_sampling_range_valid(sampling_range)) { // don't sample
return pthread_mutex_lock_internal(mutex);
}
// Lock and monitor the waiting time.
Expand Down Expand Up @@ -856,13 +858,14 @@ void FastPthreadMutex::unlock() {
extern "C" {

int bthread_mutex_init(bthread_mutex_t* __restrict m,
const bthread_mutexattr_t* __restrict) {
const bthread_mutexattr_t* __restrict attr) {
bthread::make_contention_site_invalid(&m->csite);
m->butex = bthread::butex_create_checked<unsigned>();
if (!m->butex) {
return ENOMEM;
}
*m->butex = 0;
m->enable_csite = NULL == attr ? true : attr->enable_csite;
return 0;
}

Expand All @@ -883,35 +886,9 @@ int bthread_mutex_lock_contended(bthread_mutex_t* m) {
return bthread::mutex_lock_contended(m);
}

int bthread_mutex_lock(bthread_mutex_t* m) {
bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
if (!split->locked.exchange(1, butil::memory_order_acquire)) {
return 0;
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
return bthread::mutex_lock_contended(m);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
return bthread::mutex_lock_contended(m);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
const int rc = bthread::mutex_lock_contended(m);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
} // else rare
return rc;
}

int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
static int bthread_mutex_lock_impl(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
auto split = (bthread::MutexInternal*)m->butex;
if (!split->locked.exchange(1, butil::memory_order_acquire)) {
return 0;
}
Expand All @@ -920,7 +897,8 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
return bthread::mutex_timedlock_contended(m, abstime);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
const size_t sampling_range =
m->enable_csite ? bvar::is_collectable(&bthread::g_cp_sl) : 0;
if (!sampling_range) { // Don't sample
return bthread::mutex_timedlock_contended(m, abstime);
}
Expand All @@ -941,10 +919,20 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
return rc;
}

int bthread_mutex_lock(bthread_mutex_t* m) {
return bthread_mutex_lock_impl(m, NULL);
}

int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
return bthread_mutex_lock_impl(m, abstime);
}

int bthread_mutex_unlock(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
bthread_contention_site_t saved_csite = {0, 0};
if (bthread::is_contention_site_valid(m->csite)) {
bool is_valid = bthread::is_contention_site_valid(m->csite);
if (is_valid) {
saved_csite = m->csite;
bthread::make_contention_site_invalid(&m->csite);
}
Expand All @@ -954,7 +942,7 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
return 0;
}
// Wakeup one waiter
if (!bthread::is_contention_site_valid(saved_csite)) {
if (!is_valid) {
bthread::butex_wake(whole);
return 0;
}
Expand All @@ -966,6 +954,21 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
return 0;
}

int bthread_mutexattr_init(bthread_mutexattr_t* attr) {
attr->enable_csite = true;
return 0;
}

int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr) {
attr->enable_csite = false;
return 0;
}

int bthread_mutexattr_destroy(bthread_mutexattr_t* attr) {
attr->enable_csite = true;
return 0;
}

#ifndef NO_PTHREAD_MUTEX_HOOK
int pthread_mutex_lock(pthread_mutex_t* __mutex) {
return bthread::pthread_mutex_lock_impl(__mutex);
Expand Down
5 changes: 3 additions & 2 deletions src/bthread/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

__BEGIN_DECLS
extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
const bthread_mutexattr_t* __restrict mutex_attr);
const bthread_mutexattr_t* __restrict attr);
extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
extern int bthread_mutex_trylock(bthread_mutex_t* mutex);
extern int bthread_mutex_lock(bthread_mutex_t* mutex);
Expand All @@ -48,7 +48,8 @@ class Mutex {
Mutex() {
int ec = bthread_mutex_init(&_mutex, NULL);
if (ec != 0) {
throw std::system_error(std::error_code(ec, std::system_category()), "Mutex constructor failed");
throw std::system_error(std::error_code(ec, std::system_category()),
"Mutex constructor failed");
}
}
~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
Expand Down
Loading

0 comments on commit a4e41fc

Please sign in to comment.