Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace RwLock by a futex based one on Linux. #95762

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions library/std/src/sys/unix/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) {
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wake(futex: &AtomicI32) {
pub fn futex_wake(futex: &AtomicI32) -> bool {
unsafe {
libc::syscall(
libc::SYS_futex,
futex as *const AtomicI32,
libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
1,
);
) > 0
}
}

Expand All @@ -93,12 +93,10 @@ pub fn futex_wake_all(futex: &AtomicI32) {
}

#[cfg(target_os = "emscripten")]
pub fn futex_wake(futex: &AtomicI32) {
pub fn futex_wake(futex: &AtomicI32) -> bool {
extern "C" {
fn emscripten_futex_wake(addr: *const AtomicI32, count: libc::c_int) -> libc::c_int;
}

unsafe {
emscripten_futex_wake(futex as *const AtomicI32, 1);
}
unsafe { emscripten_futex_wake(futex as *const AtomicI32, 1) > 0 }
}
264 changes: 264 additions & 0 deletions library/std/src/sys/unix/locks/futex_rwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
use crate::sync::atomic::{
AtomicI32,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};

pub type MovableRwLock = RwLock;

pub struct RwLock {
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
// All bits of the reader counter set means write locked.
// A reader count of zero means the lock is unlocked.
// See the constants below.
// Readers wait on this futex.
state: AtomicI32,
// The 'condition variable' to notify writers through.
// Incremented on every signal.
// Writers wait on this futex.
writer_notify: AtomicI32,
}

const READ_LOCKED: i32 = 1;
const MAX_READERS: i32 = (1 << 30) - 2;
const WRITE_LOCKED: i32 = (1 << 30) - 1;
const READERS_WAITING: i32 = 1 << 30;
const WRITERS_WAITING: i32 = 1 << 31;

fn readers(state: i32) -> i32 {
state & !(READERS_WAITING + WRITERS_WAITING)
}

fn readers_waiting(state: i32) -> bool {
state & READERS_WAITING != 0
}

fn writers_waiting(state: i32) -> bool {
state & WRITERS_WAITING != 0
}

fn read_lockable(state: i32) -> bool {
readers(state) < MAX_READERS && !readers_waiting(state) && !writers_waiting(state)
}

impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicI32::new(0), writer_notify: AtomicI32::new(0) }
}

#[inline]
pub unsafe fn destroy(&self) {}

#[inline]
pub unsafe fn try_read(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| read_lockable(s).then(|| s + READ_LOCKED))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the reader count overflows?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_lockable() checks for MAX_READERS which looks like its writer value minus one. If the reader count would overflow into a writer, it looks like that would be detected by read_lockable() and short circuit to None.

.is_ok()
}

#[inline]
pub unsafe fn read(&self) {
if !self.try_read() {
self.read_contended();
}
}

#[inline]
pub unsafe fn try_write(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| (readers(s) == 0).then(|| s + WRITE_LOCKED))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this correctly handles the case where there is an existing write lock but no waiting threads or readers.

Copy link

@kprotty kprotty Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The readers() function looks like it extracts the readers count, meaning the value doesn't consider existing waiting threads. Having a writer locked means the readers count would be the maximum value so the .then() clause would short-circuit early and return None. The use of + over | is a bit misleading, but that just transitions from 0 readers to max readers (a.k.a writer).

.is_ok()
}

#[inline]
pub unsafe fn write(&self) {
if !self.try_write() {
self.write_contended();
}
}

#[inline]
pub unsafe fn read_unlock(&self) {
let s = self.state.fetch_sub(READ_LOCKED, Release);

// It's impossible for readers to be waiting if it was read locked.
debug_assert!(!readers_waiting(s));

// Wake up a writer if we were the last reader and there's a writer waiting.
if s == READ_LOCKED + WRITERS_WAITING {
self.wake_after_read_unlock();
}
}

#[inline]
pub unsafe fn write_unlock(&self) {
if let Err(e) = self.state.compare_exchange(WRITE_LOCKED, 0, Release, Relaxed) {
// Readers or writers (or both) are waiting.
self.write_unlock_contended(e);
}
}

#[cold]
fn read_contended(&self) {
let mut state = self.spin_read();

loop {
// If we can lock it, lock it.
if read_lockable(state) {
match self.state.compare_exchange(state, state + READ_LOCKED, Acquire, Relaxed) {
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}

// Check for overflow.
if readers(state) == MAX_READERS {
panic!("too many active read locks on RwLock");
}

// Make sure the readers waiting bit is set before we go to sleep.
if !readers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}

// Wait for the state to change.
futex_wait(&self.state, state | READERS_WAITING, None);

// Spin again after waking up.
state = self.spin_read();
}
}

#[cold]
fn write_contended(&self) {
let mut state = self.spin_write();

loop {
// If it's unlocked, we try to lock it.
if readers(state) == 0 {
match self.state.compare_exchange(
state,
state | WRITE_LOCKED | WRITERS_WAITING, // Other threads might be waiting.
Acquire,
Relaxed,
) {
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}

// Set the waiting bit indicating that we're waiting on it.
if !writers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}

// Examine the notification counter before we check if `state` has changed,
// to make sure we don't miss any notifications.
let seq = self.writer_notify.load(Acquire);

// Don't go to sleep if the lock has become available, or the
// writers waiting bit is no longer set.
let s = self.state.load(Relaxed);
if readers(s) == 0 || !writers_waiting(s) {
state = s;
continue;
}

// Wait for the state to change.
futex_wait(&self.writer_notify, seq, None);

// Spin again after waking up.
state = self.spin_write();
}
}

#[cold]
fn wake_after_read_unlock(&self) {
// If this compare_exchange fails, another writer already locked, which
// will take care of waking up the next waiting writer.
if self.state.compare_exchange(WRITERS_WAITING, 0, Relaxed, Relaxed).is_ok() {
self.writer_notify.fetch_add(1, Release);
futex_wake(&self.writer_notify);
}
}

#[cold]
fn write_unlock_contended(&self, mut state: i32) {
// If there are any waiting writers _or_ waiting readers, but not both (!),
// we turn off that bit while unlocking.
if readers_waiting(state) != writers_waiting(state) {
if self.state.compare_exchange(state, 0, Release, Relaxed).is_err() {
// The only way this can fail is if the other waiting bit was set too.
state |= READERS_WAITING | WRITERS_WAITING;
}
}

// If both readers and writers are waiting, unlock but leave the readers waiting.
if readers_waiting(state) && writers_waiting(state) {
self.state.store(READERS_WAITING, Release);
}

if writers_waiting(state) {
// Notify one writer, if any writer was waiting.
self.writer_notify.fetch_add(1, Release);
if !futex_wake(&self.writer_notify) {
// If there was no writer to wake up, maybe there's readers to wake up instead.
if readers_waiting(state) {
// If this compare_exchange fails, another writer already locked, which
// will take care of waking up the next waiting writer.
if self.state.compare_exchange(READERS_WAITING, 0, Relaxed, Relaxed).is_ok() {
futex_wake_all(&self.state);
}
}
}
} else if readers_waiting(state) {
// Notify all readers, if any reader was waiting.
futex_wake_all(&self.state);
}
}

/// Spin for a while, but stop directly at the given condition.
fn spin_until(&self, f: impl Fn(i32) -> bool) -> i32 {
let mut spin = 100; // Chosen by fair dice roll.
loop {
let state = self.state.load(Relaxed);
if f(state) || spin == 0 {
return state;
}
crate::hint::spin_loop();
spin -= 1;
}
}

fn spin_write(&self) -> i32 {
self.spin_until(|state| {
// Stop spinning when we can lock it, or when there's waiting
// writers, to keep things somewhat fair.
readers(state) == 0 || writers_waiting(state)
})
}

fn spin_read(&self) -> i32 {
self.spin_until(|state| {
// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
readers(state) != WRITE_LOCKED || readers_waiting(state) || writers_waiting(state)
})
}
}
4 changes: 2 additions & 2 deletions library/std/src/sys/unix/locks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ cfg_if::cfg_if! {
target_os = "android",
))] {
mod futex;
mod futex_rwlock;
#[allow(dead_code)]
mod pthread_mutex; // Only used for PthreadMutexAttr, needed by pthread_remutex.
mod pthread_remutex; // FIXME: Implement this using a futex
mod pthread_rwlock; // FIXME: Implement this using a futex
pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar};
pub use pthread_remutex::ReentrantMutex;
pub use pthread_rwlock::{RwLock, MovableRwLock};
pub use futex_rwlock::{RwLock, MovableRwLock};
} else {
mod pthread_mutex;
mod pthread_remutex;
Expand Down