Skip to content

Commit

Permalink
Add lock-free initialization
Browse files Browse the repository at this point in the history
This avoids memory allocations and Mutex creation if there is no
contention when initializing the OnceCell.
  • Loading branch information
danieldg committed Aug 10, 2021
1 parent 6372d25 commit 2bda31a
Showing 1 changed file with 64 additions and 4 deletions.
68 changes: 64 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct QueueRef<'a> {
unsafe impl<'a> Sync for QueueRef<'a> {}
unsafe impl<'a> Send for QueueRef<'a> {}

struct QuickInitGuard<'a>(&'a Inner);

/// A Future that waits for acquisition of a QueueHead
struct QueueWaiter<'a> {
guard: Option<QueueRef<'a>>,
Expand All @@ -62,13 +64,27 @@ struct QueueHead<'a> {
}

const NEW: usize = 0x0;
const QINIT_BIT: usize = 1 + (usize::MAX >> 2);
const READY_BIT: usize = 1 + (usize::MAX >> 1);

impl Inner {
const fn new() -> Self {
Inner { state: AtomicUsize::new(NEW), queue: AtomicPtr::new(ptr::null_mut()) }
}

/// Try to grab a lock without allocating. This succeeds only if there is no contention, and
/// on Drop it will check for contention again.
#[cold]
fn try_quick_init(&self) -> Option<QuickInitGuard> {
if self.state.compare_exchange(NEW, QINIT_BIT, Ordering::Acquire, Ordering::Relaxed).is_ok()
{
// On success, this acquires a write lock on value
Some(QuickInitGuard(self))
} else {
None
}
}

/// Initialize the queue (if needed) and return a waiter that can be polled to get a QueueHead
/// that gives permission to initialize the OnceCell.
///
Expand All @@ -85,14 +101,23 @@ impl Inner {
// The poll call requires a Pinned pointer to this Future, and the contract of Pin requires
// Drop to be called on any !Unpin value that was pinned before the memory is reused.
// Because the Drop impl of QueueRef decrements the refcount, an overflow would require
// more than (usize::MAX / 2) QueueRef objects in memory, which is impossible as these
// objects take up more than 2 bytes.
// more than (usize::MAX / 4) QueueRef objects in memory, which is impossible as these
// objects take up more than 4 bytes.

let mut guard = QueueRef { inner: self, queue: self.queue.load(Ordering::Acquire) };

if guard.queue.is_null() && prev_state & READY_BIT == 0 {
let wakers = if prev_state & QINIT_BIT != 0 {
// Someone else is taking the fast path; start with no QueueHead available. As
// long as our future is pending, QuickInitGuard::drop will observe the nonzero
// reference count and correctly participate in the queue.
Mutex::new(Some(Vec::new()))
} else {
Mutex::new(None)
};

// Race with other callers of initialize to create the queue
let new_queue = Box::into_raw(Box::new(Queue { wakers: Mutex::new(None) }));
let new_queue = Box::into_raw(Box::new(Queue { wakers }));

match self.queue.compare_exchange(
ptr::null_mut(),
Expand Down Expand Up @@ -134,7 +159,7 @@ impl<'a> Drop for QueueRef<'a> {
// Note: as of now, self.queue may be invalid

let curr_state = prev_state - 1;
if curr_state == READY_BIT {
if curr_state == READY_BIT || curr_state == READY_BIT | QINIT_BIT {
// We just removed the only waiter on an initialized cell. This means the
// queue is no longer needed. Acquire the queue again so we can free it.
let queue = self.inner.queue.swap(ptr::null_mut(), Ordering::Acquire);
Expand All @@ -149,6 +174,27 @@ impl<'a> Drop for QueueRef<'a> {
}
}

impl<'a> Drop for QuickInitGuard<'a> {
fn drop(&mut self) {
// Relaxed ordering is sufficient here: all decisions are made based solely on the value
// and timeline of the state value. On the slow path, initialize acquires the reference as
// normal and so we don't need any more ordering than that already provides.
let prev_state = self.0.state.fetch_and(!QINIT_BIT, Ordering::Relaxed);
if prev_state == QINIT_BIT | READY_BIT {
return; // fast path, init succeeded. The Release in set_ready was sufficient.
}
if prev_state == QINIT_BIT {
return; // fast path, init failed. We made no writes that need to be ordered.
}
// Get a guard, create the QueueHead we should have been holding, then drop it so that the
// tasks are woken as intended. This is needed regardless of if we succeeded or not -
// either waiters need to run init themselves, or they need to read the value we set.
let guard = self.0.initialize().guard.unwrap();
// Note: in strange corner cases we can get a guard with a NULL queue here
drop(QueueHead { guard })
}
}

impl Drop for Inner {
fn drop(&mut self) {
let queue = *self.queue.get_mut();
Expand Down Expand Up @@ -268,6 +314,20 @@ impl<T> OnceCell<T> {
let state = self.inner.state.load(Ordering::Acquire);

if state & READY_BIT == 0 {
if state == NEW {
// If there is no contention, we can initialize without allocations. Try it.
if let Some(guard) = self.inner.try_quick_init() {
let value = init.await?;
unsafe {
*self.value.get() = Some(value);
}
self.inner.set_ready();
drop(guard);

return Ok(unsafe { (&*self.value.get()).as_ref().unwrap() });
}
}

let guard = self.inner.initialize();
if let Some(init_lock) = guard.await {
// We hold the QueueHead, so we know that nobody else has successfully run an init
Expand Down

0 comments on commit 2bda31a

Please sign in to comment.