Skip to content

Commit

Permalink
implement rwlock in bthread.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <d.ketor@gmail.com>
  • Loading branch information
ketor committed Aug 27, 2024
1 parent d3c6854 commit 80e30ab
Show file tree
Hide file tree
Showing 4 changed files with 530 additions and 0 deletions.
172 changes: 172 additions & 0 deletions src/bthread/rwlock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// bthread - An M:N threading library to make applications more concurrent.

// Date: Tue August 10 23:50:50 CST 2024

#include <dlfcn.h> // dlsym
#include <execinfo.h>
#include <fcntl.h> // O_RDONLY
#include <pthread.h>

#include "bthread/bthread.h"
#include "bthread/butex.h" // butex_*
#include "bthread/log.h"
#include "bthread/processor.h" // cpu_relax, barrier
#include "bthread/sys_futex.h"
#include "butil/atomicops.h"
#include "butil/containers/flat_map.h"
#include "butil/fd_guard.h"
#include "butil/file_util.h"
#include "butil/files/file.h"
#include "butil/files/file_path.h"
#include "butil/iobuf.h"
#include "butil/logging.h"
#include "butil/macros.h" // BAIDU_CASSERT
#include "butil/object_pool.h"
#include "butil/third_party/murmurhash3/murmurhash3.h"
#include "butil/unique_ptr.h"
#include "bvar/bvar.h"
#include "bvar/collector.h"

namespace bthread {

inline int rwlock_unrlock(bthread_rwlock_t* rwlock) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)rwlock->lock_flag;

while (true) {
unsigned r = whole->load();
if (r == 0 || (r >> 31) != 0) {
LOG(ERROR) << "wrong unrlock!";
return 0;
}
if (!(whole->compare_exchange_weak(r, r - 1))) {
continue;
}
// wake up write waiter
bthread::butex_wake(whole);
return 0;
}
}

inline int rwlock_unwlock(bthread_rwlock_t* rwlock) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)rwlock->lock_flag;

while (true) {
unsigned r = whole->load();
if (r != (unsigned)(1 << 31)) {
LOG(ERROR) << "wrong unwlock!";
return 0;
}
if (!whole->compare_exchange_weak(r, 0)) {
continue;
}
// wake up write waiter first
bthread::butex_wake(whole);
butil::atomic<unsigned>* w_wait_count = (butil::atomic<unsigned>*)rwlock->w_wait_count;
// try reduce wait_count for read waiters,and wake up read waiters
w_wait_count->fetch_sub(1);
bthread::butex_wake_all(w_wait_count);
return 0;
}
}

inline int rwlock_unlock(bthread_rwlock_t* rwlock) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)rwlock->lock_flag;
if ((whole->load(butil::memory_order_relaxed) >> 31) != 0) {
return rwlock_unwlock(rwlock);
} else {
return rwlock_unrlock(rwlock);
}
}

inline int rwlock_rlock(bthread_rwlock_t* rwlock) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)rwlock->lock_flag;

butil::atomic<unsigned>* w_wait_count = (butil::atomic<unsigned>*)rwlock->w_wait_count;
while (true) {
unsigned w = w_wait_count->load();
if (w > 0) {
if (bthread::butex_wait(w_wait_count, w, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
continue;
}
// FIXME!! we don't consider read_wait_count overflow yet,2^31 should be enough here
unsigned r = whole->load();
if ((r >> 31) == 0) {
if (whole->compare_exchange_weak(r, r + 1)) {
return 0;
}
}
}
}

inline int rwlock_wlock(bthread_rwlock_t* rwlock) {
butil::atomic<unsigned>* w_wait_count = (butil::atomic<unsigned>*)rwlock->w_wait_count;
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)rwlock->lock_flag;
// we don't consider w_wait_count overflow yet,2^32 should be enough here
w_wait_count->fetch_add(1);
while (true) {
unsigned r = whole->load();
if (r != 0) {
if (bthread::butex_wait(whole, r, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR) {
whole->fetch_sub(1);
return errno;
}
continue;
}
if (whole->compare_exchange_weak(r, (unsigned)(1 << 31))) {
return 0;
}
}
}

} // namespace bthread

extern "C" {

int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, const bthread_rwlockattr_t* __restrict attr) {
rwlock->w_wait_count = bthread::butex_create_checked<unsigned>();
rwlock->lock_flag = bthread::butex_create_checked<unsigned>();
if (!rwlock->w_wait_count || !rwlock->lock_flag) {
LOG(ERROR) << "no memory";
return ENOMEM;
}
*rwlock->w_wait_count = 0;
*rwlock->lock_flag = 0;
return 0;
}

int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) {
bthread::butex_destroy(rwlock->w_wait_count);
bthread::butex_destroy(rwlock->lock_flag);
return 0;
}

int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_rlock(rwlock); }

int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_wlock(rwlock); }

int bthread_rwlock_unrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unrlock(rwlock); }

int bthread_rwlock_unwlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unwlock(rwlock); }

int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unlock(rwlock); }

}
144 changes: 144 additions & 0 deletions src/bthread/rwlock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// bthread - An M:N threading library to make applications more concurrent.

// Date: Tue August 10 23:50:50 CST 2024

#ifndef BTHREAD_RW_MUTEX_H
#define BTHREAD_RW_MUTEX_H

#include "bthread/bthread.h"
#include "bthread/types.h"
#include "butil/scoped_lock.h"
#include "bvar/utils/lock_timer.h"

__BEGIN_DECLS
// -------------------------------------------
// Functions for handling read-write locks.
// -------------------------------------------

// Initialize read-write lock `rwlock' using attributes `attr', or use
// the default values if later is NULL.
extern int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, const bthread_rwlockattr_t* __restrict attr);

// Destroy read-write lock `rwlock'.
extern int bthread_rwlock_destroy(bthread_rwlock_t* rwlock);

// Acquire read lock for `rwlock'.
extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock);

// Try to acquire read lock for `rwlock'.
extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock);

// Try to acquire read lock for `rwlock' or return after specfied time.
extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime);

// Acquire write lock for `rwlock'.
extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);

// Try to acquire write lock for `rwlock'.
extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock);

// Try to acquire write lock for `rwlock' or return after specfied time.
extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime);

// Unlock `rwlock'.
extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock);

// ---------------------------------------------------
// Functions for handling read-write lock attributes.
// ---------------------------------------------------

// Initialize attribute object `attr' with default values.
extern int bthread_rwlockattr_init(bthread_rwlockattr_t* attr);

// Destroy attribute object `attr'.
extern int bthread_rwlockattr_destroy(bthread_rwlockattr_t* attr);

// Return current setting of reader/writer preference.
extern int bthread_rwlockattr_getkind_np(const bthread_rwlockattr_t* attr, int* pref);

// Set reader/write preference.
extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr, int pref);
__END_DECLS

// Specialize std::lock_guard and std::unique_lock for bthread_rwlock_t

namespace bthread {

class wlock_guard {
public:
explicit wlock_guard(bthread_rwlock_t& mutex) : _pmutex(&mutex) {
#if !defined(NDEBUG)
const int rc = bthread_rwlock_wrlock(_pmutex);
if (rc) {
LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _pmutex << ", " << berror(rc);
_pmutex = NULL;
}
#else
bthread_rwlock_wrlock(_pmutex);
#endif // NDEBUG
}

~wlock_guard() {
#ifndef NDEBUG
if (_pmutex) {
bthread_rwlock_unlock(_pmutex);
}
#else
bthread_rwlock_unlock(_pmutex);
#endif
}

private:
DISALLOW_COPY_AND_ASSIGN(wlock_guard);
bthread_rwlock_t* _pmutex;
};

class rlock_guard {
public:
explicit rlock_guard(bthread_rwlock_t& mutex) : _pmutex(&mutex) {
#if !defined(NDEBUG)
const int rc = bthread_rwlock_rdlock(_pmutex);
if (rc) {
LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _pmutex << ", " << berror(rc);
_pmutex = NULL;
}
#else
bthread_rwlock_rdlock(_pmutex);
#endif // NDEBUG
}

~rlock_guard() {
#ifndef NDEBUG
if (_pmutex) {
bthread_rwlock_unlock(_pmutex);
}
#else
bthread_rwlock_unlock(_pmutex);
#endif
}

private:
DISALLOW_COPY_AND_ASSIGN(rlock_guard);
bthread_rwlock_t* _pmutex;
};

} // namespace bthread

#endif
3 changes: 3 additions & 0 deletions src/bthread/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ typedef struct {
} bthread_condattr_t;

typedef struct {
bthread_mutex_t* m;
unsigned* w_wait_count; // include the bthread who holding wlock yet
unsigned* lock_flag; // highest bit 1 for wlocked, low 31 bit for read lock
} bthread_rwlock_t;

typedef struct {
Expand Down
Loading

0 comments on commit 80e30ab

Please sign in to comment.