Skip to content

Commit

Permalink
Rollup merge of #127567 - joboet:once_wait, r=Amanieu
Browse files Browse the repository at this point in the history
std: implement the `once_wait` feature

Tracking issue: #127527

This additionally adds a `wait_force` method to `Once` that doesn't panic on poison.

I also took the opportunity and cleaned up up the code of the queue-based implementation a bit.
  • Loading branch information
matthiaskrgr committed Jul 31, 2024
2 parents 28a58f2 + 1d49aad commit c1f2112
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 94 deletions.
41 changes: 41 additions & 0 deletions library/std/src/sync/once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,47 @@ impl Once {
self.inner.is_completed()
}

/// Blocks the current thread until initialization has completed.
///
/// # Example
///
/// ```rust
/// #![feature(once_wait)]
///
/// use std::sync::Once;
/// use std::thread;
///
/// static READY: Once = Once::new();
///
/// let thread = thread::spawn(|| {
/// READY.wait();
/// println!("everything is ready");
/// });
///
/// READY.call_once(|| println!("performing setup"));
/// ```
///
/// # Panics
///
/// If this [`Once`] has been poisoned because an initialization closure has
/// panicked, this method will also panic. Use [`wait_force`](Self::wait_force)
/// if this behaviour is not desired.
#[unstable(feature = "once_wait", issue = "127527")]
pub fn wait(&self) {
if !self.inner.is_completed() {
self.inner.wait(false);
}
}

/// Blocks the current thread until initialization has completed, ignoring
/// poisoning.
#[unstable(feature = "once_wait", issue = "127527")]
pub fn wait_force(&self) {
if !self.inner.is_completed() {
self.inner.wait(true);
}
}

/// Returns the current state of the `Once` instance.
///
/// Since this takes a mutable reference, no initialization can currently
Expand Down
47 changes: 47 additions & 0 deletions library/std/src/sync/once/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::Once;
use crate::sync::atomic::AtomicBool;
use crate::sync::atomic::Ordering::Relaxed;
use crate::sync::mpsc::channel;
use crate::time::Duration;
use crate::{panic, thread};

#[test]
Expand Down Expand Up @@ -113,3 +116,47 @@ fn wait_for_force_to_finish() {
assert!(t1.join().is_ok());
assert!(t2.join().is_ok());
}

#[test]
fn wait() {
for _ in 0..50 {
let val = AtomicBool::new(false);
let once = Once::new();

thread::scope(|s| {
for _ in 0..4 {
s.spawn(|| {
once.wait();
assert!(val.load(Relaxed));
});
}

once.call_once(|| val.store(true, Relaxed));
});
}
}

#[test]
fn wait_on_poisoned() {
let once = Once::new();

panic::catch_unwind(|| once.call_once(|| panic!())).unwrap_err();
panic::catch_unwind(|| once.wait()).unwrap_err();
}

#[test]
fn wait_force_on_poisoned() {
let once = Once::new();

thread::scope(|s| {
panic::catch_unwind(|| once.call_once(|| panic!())).unwrap_err();

s.spawn(|| {
thread::sleep(Duration::from_millis(100));

once.call_once_force(|_| {});
});

once.wait_force();
})
}
28 changes: 28 additions & 0 deletions library/std/src/sync/once_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,34 @@ impl<T> OnceLock<T> {
}
}

/// Blocks the current thread until the cell is initialized.
///
/// # Example
///
/// Waiting for a computation on another thread to finish:
/// ```rust
/// #![feature(once_wait)]
///
/// use std::thread;
/// use std::sync::OnceLock;
///
/// let value = OnceLock::new();
///
/// thread::scope(|s| {
/// s.spawn(|| value.set(1 + 1));
///
/// let result = value.wait();
/// assert_eq!(result, &2);
/// })
/// ```
#[inline]
#[unstable(feature = "once_wait", issue = "127527")]
pub fn wait(&self) -> &T {
self.once.wait_force();

unsafe { self.get_unchecked() }
}

/// Sets the contents of this cell to `value`.
///
/// May block if another thread is currently attempting to initialize the cell. The cell is
Expand Down
124 changes: 88 additions & 36 deletions library/std/src/sys/sync/once/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::sync::once::ExclusiveState;
use crate::sys::futex::{futex_wait, futex_wake_all};

// On some platforms, the OS is very nice and handles the waiter queue for us.
// This means we only need one atomic value with 5 states:
// This means we only need one atomic value with 4 states:

/// No initialization has run yet, and no thread is currently using the Once.
const INCOMPLETE: u32 = 0;
Expand All @@ -17,16 +17,20 @@ const POISONED: u32 = 1;
/// Some thread is currently attempting to run initialization. It may succeed,
/// so all future threads need to wait for it to finish.
const RUNNING: u32 = 2;
/// Some thread is currently attempting to run initialization and there are threads
/// waiting for it to finish.
const QUEUED: u32 = 3;
/// Initialization has completed and all future calls should finish immediately.
const COMPLETE: u32 = 4;
const COMPLETE: u32 = 3;

// Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
// An additional bit indicates whether there are waiting threads:

/// May only be set if the state is not COMPLETE.
const QUEUED: u32 = 4;

// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
// variable. When the running thread finishes, it will wake all waiting threads using
// `futex_wake_all`.

const STATE_MASK: u32 = 0b11;

pub struct OnceState {
poisoned: bool,
set_state_to: Cell<u32>,
Expand All @@ -45,7 +49,7 @@ impl OnceState {
}

struct CompletionGuard<'a> {
state: &'a AtomicU32,
state_and_queued: &'a AtomicU32,
set_state_on_drop_to: u32,
}

Expand All @@ -54,64 +58,106 @@ impl<'a> Drop for CompletionGuard<'a> {
// Use release ordering to propagate changes to all threads checking
// up on the Once. `futex_wake_all` does its own synchronization, hence
// we do not need `AcqRel`.
if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED {
futex_wake_all(self.state);
if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
futex_wake_all(self.state_and_queued);
}
}
}

pub struct Once {
state: AtomicU32,
state_and_queued: AtomicU32,
}

impl Once {
#[inline]
pub const fn new() -> Once {
Once { state: AtomicU32::new(INCOMPLETE) }
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
}

#[inline]
pub fn is_completed(&self) -> bool {
// Use acquire ordering to make all initialization changes visible to the
// current thread.
self.state.load(Acquire) == COMPLETE
self.state_and_queued.load(Acquire) == COMPLETE
}

#[inline]
pub(crate) fn state(&mut self) -> ExclusiveState {
match *self.state.get_mut() {
match *self.state_and_queued.get_mut() {
INCOMPLETE => ExclusiveState::Incomplete,
POISONED => ExclusiveState::Poisoned,
COMPLETE => ExclusiveState::Complete,
_ => unreachable!("invalid Once state"),
}
}

// This uses FnMut to match the API of the generic implementation. As this
// implementation is quite light-weight, it is generic over the closure and
// so avoids the cost of dynamic dispatch.
#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
let mut state = self.state.load(Acquire);
pub fn wait(&self, ignore_poisoning: bool) {
let mut state_and_queued = self.state_and_queued.load(Acquire);
loop {
let state = state_and_queued & STATE_MASK;
let queued = state_and_queued & QUEUED != 0;
match state {
COMPLETE => return,
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}
_ => {
// Set the QUEUED bit if it has not already been set.
if !queued {
state_and_queued += QUEUED;
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state,
state_and_queued,
Relaxed,
Acquire,
) {
state_and_queued = new;
continue;
}
}

futex_wait(&self.state_and_queued, state_and_queued, None);
state_and_queued = self.state_and_queued.load(Acquire);
}
}
}
}

#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
let mut state_and_queued = self.state_and_queued.load(Acquire);
loop {
let state = state_and_queued & STATE_MASK;
let queued = state_and_queued & QUEUED != 0;
match state {
COMPLETE => return,
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}
INCOMPLETE | POISONED => {
// Try to register the current thread as the one running.
if let Err(new) =
self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire)
{
state = new;
let next = RUNNING + if queued { QUEUED } else { 0 };
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state_and_queued,
next,
Acquire,
Acquire,
) {
state_and_queued = new;
continue;
}

// `waiter_queue` will manage other waiting threads, and
// wake them up on drop.
let mut waiter_queue =
CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED };
let mut waiter_queue = CompletionGuard {
state_and_queued: &self.state_and_queued,
set_state_on_drop_to: POISONED,
};
// Run the function, letting it know if we're poisoned or not.
let f_state = public::OnceState {
inner: OnceState {
Expand All @@ -123,21 +169,27 @@ impl Once {
waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
return;
}
RUNNING | QUEUED => {
// Set the state to QUEUED if it is not already.
if state == RUNNING
&& let Err(new) =
self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire)
{
state = new;
continue;
_ => {
// All other values must be RUNNING.
assert!(state == RUNNING);

// Set the QUEUED bit if it is not already set.
if !queued {
state_and_queued += QUEUED;
if let Err(new) = self.state_and_queued.compare_exchange_weak(
state,
state_and_queued,
Relaxed,
Acquire,
) {
state_and_queued = new;
continue;
}
}

futex_wait(&self.state, QUEUED, None);
state = self.state.load(Acquire);
futex_wait(&self.state_and_queued, state_and_queued, None);
state_and_queued = self.state_and_queued.load(Acquire);
}
COMPLETE => return,
_ => unreachable!("state is never set to invalid values"),
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions library/std/src/sys/sync/once/no_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ impl Once {
}
}

#[cold]
#[track_caller]
pub fn wait(&self, _ignore_poisoning: bool) {
panic!("not implementable on this target");
}

#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
Expand Down
Loading

0 comments on commit c1f2112

Please sign in to comment.