Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support semaphore and rwlock for bthread #2752

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 59 additions & 7 deletions src/bthread/bthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ extern int bthread_usleep(uint64_t microseconds);
// NOTE: mutexattr is not used in current mutex implementation. User shall
// always pass a NULL attribute.
extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
const bthread_mutexattr_t* __restrict mutex_attr);
const bthread_mutexattr_t* __restrict attr);

// Destroy `mutex'.
extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
Expand All @@ -188,6 +188,13 @@ extern int bthread_mutex_timedlock(bthread_mutex_t* __restrict mutex,
// Unlock `mutex'.
extern int bthread_mutex_unlock(bthread_mutex_t* mutex);

extern int bthread_mutexattr_init(bthread_mutexattr_t* attr);

// Disable the contention profile of the mutex.
extern int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr);

extern int bthread_mutexattr_destroy(bthread_mutexattr_t* attr);

// -----------------------------------------------
// Functions for handling conditional variables.
// -----------------------------------------------
Expand Down Expand Up @@ -241,9 +248,8 @@ extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock);
extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock);

// Try to acquire read lock for `rwlock' or return after specfied time.
extern int bthread_rwlock_timedrdlock(
bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);
extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);

// Acquire write lock for `rwlock'.
extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
Expand All @@ -252,9 +258,8 @@ extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock);

// Try to acquire write lock for `rwlock' or return after specfied time.
extern int bthread_rwlock_timedwrlock(
bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);
extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);

// Unlock `rwlock'.
extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock);
Expand All @@ -277,6 +282,53 @@ extern int bthread_rwlockattr_getkind_np(const bthread_rwlockattr_t* attr,
extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr,
int pref);

// -------------------------------------------
// Functions for handling semaphore.
// -------------------------------------------

// Initialize the semaphore referred to by `sem'. The value of the
// initialized semaphore shall be `value'.
// Return 0 on success, errno otherwise.
extern int bthread_sem_init(bthread_sem_t* sem, unsigned value);

// Disable the contention profile of the semaphore referred to by `sem'.
extern int bthread_sem_disable_csite(bthread_sem_t* sem);

// Destroy the semaphore indicated by `sem'.
// Return 0 on success, errno otherwise.
extern int bthread_sem_destroy(bthread_sem_t* semaphore);

// Lock the semaphore referenced by `sem' by performing a semaphore
// lock operation on that semaphore. If the semaphore value is currently
// zero, then the calling (b)thread shall not return from the call to
// bthread_sema_wait() function until it locks the semaphore.
// Return 0 on success, errno otherwise.
extern int bthread_sem_wait(bthread_sem_t* sem);

// Lock the semaphore referenced by `sem' as in the bthread_sem_wait()
// function. However, if the semaphore cannot be locked without waiting
// for another (b)thread to unlock the semaphore by performing a
// bthread_sem_post() function, this wait shall be terminated when the
// specified timeout expires.
// Return 0 on success, errno otherwise.
extern int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* abstime);

// Lock the semaphore referenced by `sem' only if the semaphore is
// currently not locked; that is, if the semaphore value is currently
// positive. Otherwise, it shall not lock the semaphore.
// Return 0 on success, errno otherwise.
extern int bthread_sem_trywait(bthread_sem_t* sem);

// Unlock the semaphore referenced by `sem' by performing
// a semaphore unlock operation on that semaphore.
// Return 0 on success, errno otherwise.
extern int bthread_sem_post(bthread_sem_t* sem);

// Unlock the semaphore referenced by `sem' by performing
// `n' semaphore unlock operation on that semaphore.
// Return 0 on success, errno otherwise.
extern int bthread_sem_post_n(bthread_sem_t* sem, size_t n);


// ----------------------------------------------------------------------
// Functions for handling barrier which is a new feature in 1003.1j-2000.
Expand Down
8 changes: 6 additions & 2 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,14 @@ int butex_wake(void* arg, bool nosignal) {
return 1;
}

int butex_wake_all(void* arg, bool nosignal) {
int butex_wake_n(void* arg, size_t n, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

ButexWaiterList bthread_waiters;
ButexWaiterList pthread_waiters;
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
while (!b->waiters.empty()) {
for (size_t i = 0; (n == 0 || i < n) && !b->waiters.empty(); ++i) {
ButexWaiter* bw = b->waiters.head()->value();
bw->RemoveFromList();
bw->container.store(NULL, butil::memory_order_relaxed);
Expand Down Expand Up @@ -393,6 +393,10 @@ int butex_wake_all(void* arg, bool nosignal) {
return nwakeup;
}

int butex_wake_all(void* arg, bool nosignal) {
return butex_wake_n(arg, 0, nosignal);
}

int butex_wake_except(void* arg, bthread_t excluded_bthread) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

Expand Down
5 changes: 5 additions & 0 deletions src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void butex_destroy(void* butex);
// Returns # of threads woken up.
int butex_wake(void* butex, bool nosignal = false);

// Wake up all threads waiting on |butex| if n is zero,
// Otherwise, wake up at most n thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_n(void* butex, size_t n, bool nosignal = false);

// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_all(void* butex, bool nosignal = false);
Expand Down
87 changes: 43 additions & 44 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;

// For controlling contentions collected per second.
static bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;
bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;

const size_t MAX_CACHED_CONTENTIONS = 512;
// Skip frames which are always same: the unlock function and submit_contention()
Expand Down Expand Up @@ -267,7 +267,7 @@ void ContentionProfiler::flush_to_disk(bool ending) {

// If contention profiler is on, this variable will be set with a valid
// instance. NULL otherwise.
BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp = NULL;
BAIDU_CACHELINE_ALIGNMENT ContentionProfiler* g_cp = NULL;
// Need this version to solve an issue that non-empty entries left by
// previous contention profilers should be detected and overwritten.
static uint64_t g_cp_version = 0;
Expand Down Expand Up @@ -369,13 +369,11 @@ void ContentionProfilerStop() {
LOG(ERROR) << "Contention profiler is not started!";
}

BUTIL_FORCE_INLINE bool
is_contention_site_valid(const bthread_contention_site_t& cs) {
return cs.sampling_range;
bool is_contention_site_valid(const bthread_contention_site_t& cs) {
return bvar::is_sampling_range_valid(cs.sampling_range);
}

BUTIL_FORCE_INLINE void
make_contention_site_invalid(bthread_contention_site_t* cs) {
void make_contention_site_invalid(bthread_contention_site_t* cs) {
cs->sampling_range = 0;
}

Expand Down Expand Up @@ -671,13 +669,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
MutexAndContentionSite& entry = fast_alt.list[fast_alt.count++];
entry.mutex = mutex;
csite = &entry.csite;
if (!sampling_range) {
if (!bvar::is_sampling_range_valid(sampling_range)) {
make_contention_site_invalid(&entry.csite);
return pthread_mutex_lock_internal(mutex);
}
}
#endif
if (!sampling_range) { // don't sample
if (!bvar::is_sampling_range_valid(sampling_range)) { // don't sample
return pthread_mutex_lock_internal(mutex);
}
// Lock and monitor the waiting time.
Expand Down Expand Up @@ -873,13 +871,14 @@ void FastPthreadMutex::unlock() {
extern "C" {

int bthread_mutex_init(bthread_mutex_t* __restrict m,
const bthread_mutexattr_t* __restrict) {
const bthread_mutexattr_t* __restrict attr) {
bthread::make_contention_site_invalid(&m->csite);
m->butex = bthread::butex_create_checked<unsigned>();
if (!m->butex) {
return ENOMEM;
}
*m->butex = 0;
m->enable_csite = NULL == attr ? true : attr->enable_csite;
return 0;
}

Expand All @@ -900,35 +899,9 @@ int bthread_mutex_lock_contended(bthread_mutex_t* m) {
return bthread::mutex_lock_contended_impl(m, NULL);
}

int bthread_mutex_lock(bthread_mutex_t* m) {
bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
if (!split->locked.exchange(1, butil::memory_order_acquire)) {
return 0;
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
return bthread::mutex_lock_contended_impl(m, NULL);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
return bthread::mutex_lock_contended_impl(m, NULL);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
const int rc = bthread::mutex_lock_contended_impl(m, NULL);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
} // else rare
return rc;
}

int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
static int bthread_mutex_lock_impl(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
auto split = (bthread::MutexInternal*)m->butex;
if (!split->locked.exchange(1, butil::memory_order_acquire)) {
return 0;
}
Expand All @@ -937,8 +910,9 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
const size_t sampling_range =
m->enable_csite ? bvar::is_collectable(&bthread::g_cp_sl) : bvar::INVALID_SAMPLING_RANGE;
if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Start sampling.
Expand All @@ -958,10 +932,20 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
return rc;
}

int bthread_mutex_lock(bthread_mutex_t* m) {
return bthread_mutex_lock_impl(m, NULL);
}

int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
return bthread_mutex_lock_impl(m, abstime);
}

int bthread_mutex_unlock(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
auto whole = (butil::atomic<unsigned>*)m->butex;
bthread_contention_site_t saved_csite = {0, 0};
if (bthread::is_contention_site_valid(m->csite)) {
bool is_valid = bthread::is_contention_site_valid(m->csite);
if (is_valid) {
saved_csite = m->csite;
bthread::make_contention_site_invalid(&m->csite);
}
Expand All @@ -971,7 +955,7 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
return 0;
}
// Wakeup one waiter
if (!bthread::is_contention_site_valid(saved_csite)) {
if (!is_valid) {
bthread::butex_wake(whole);
return 0;
}
Expand All @@ -983,6 +967,21 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
return 0;
}

int bthread_mutexattr_init(bthread_mutexattr_t* attr) {
attr->enable_csite = true;
return 0;
}

int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr) {
attr->enable_csite = false;
return 0;
}

int bthread_mutexattr_destroy(bthread_mutexattr_t* attr) {
attr->enable_csite = true;
return 0;
}

#ifndef NO_PTHREAD_MUTEX_HOOK
int pthread_mutex_lock(pthread_mutex_t* __mutex) {
return bthread::pthread_mutex_lock_impl(__mutex);
Expand Down
14 changes: 8 additions & 6 deletions src/bthread/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

__BEGIN_DECLS
extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
const bthread_mutexattr_t* __restrict mutex_attr);
const bthread_mutexattr_t* __restrict attr);
extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
extern int bthread_mutex_trylock(bthread_mutex_t* mutex);
extern int bthread_mutex_lock(bthread_mutex_t* mutex);
Expand All @@ -48,19 +48,21 @@ class Mutex {
Mutex() {
int ec = bthread_mutex_init(&_mutex, NULL);
if (ec != 0) {
throw std::system_error(std::error_code(ec, std::system_category()), "Mutex constructor failed");
throw std::system_error(std::error_code(ec, std::system_category()),
"Mutex constructor failed");
}
}
~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
native_handler_type native_handler() { return &_mutex; }
void lock() {
int ec = bthread_mutex_lock(&_mutex);
if (ec != 0) {
throw std::system_error(std::error_code(ec, std::system_category()), "Mutex lock failed");
throw std::system_error(std::error_code(ec, std::system_category()),
"Mutex lock failed");
}
}
void unlock() { bthread_mutex_unlock(&_mutex); }
bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
void unlock() { (bthread_mutex_unlock(&_mutex)); }
bool try_lock() { return 0 == bthread_mutex_trylock(&_mutex); }
// TODO(chenzhangyi01): Complement interfaces for C++11
private:
DISALLOW_COPY_AND_ASSIGN(Mutex);
Expand Down Expand Up @@ -107,7 +109,7 @@ namespace std {

template <> class lock_guard<bthread_mutex_t> {
public:
explicit lock_guard(bthread_mutex_t & mutex) : _pmutex(&mutex) {
explicit lock_guard(bthread_mutex_t& mutex) : _pmutex(&mutex) {
#if !defined(NDEBUG)
const int rc = bthread_mutex_lock(_pmutex);
if (rc) {
Expand Down
Loading
Loading