From b60f0246cac49af3590071a848ca0b7731f58fb2 Mon Sep 17 00:00:00 2001 From: Daniel De Graaf Date: Sat, 29 Apr 2023 16:43:17 -0400 Subject: [PATCH] Add basic functionality tests While this doesn't test any threading interactions, it at least runs both fast and slow-path initialization to catch obvious issues like missed wakeups, refcount changes, and freeing the queue. --- src/lib.rs | 182 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 180 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 850bd23..0ae70ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -162,11 +162,11 @@ fn with_lock(mutex: &Mutex, f: impl FnOnce(&mut T) -> R) -> R { /// This allows initialization using an async closure that borrows from its environment. /// /// ``` -/// # async fn run() { /// use std::rc::Rc; /// use std::sync::Arc; /// use async_once_cell::OnceCell; /// +/// # async fn run() { /// let non_send_value = Rc::new(4); /// let shared = Arc::new(OnceCell::new()); /// @@ -182,6 +182,14 @@ fn with_lock(mutex: &Mutex, f: impl FnOnce(&mut T) -> R) -> R { /// assert_eq!(second, &4); /// /// # } +/// # use std::future::Future; +/// # struct NeverWake; +/// # impl std::task::Wake for NeverWake { +/// # fn wake(self: Arc) {} +/// # } +/// # let w = Arc::new(NeverWake).into(); +/// # let mut cx = std::task::Context::from_waker(&w); +/// # assert!(std::pin::pin!(run()).poll(&mut cx).is_ready()); /// ``` pub struct OnceCell { value: UnsafeCell>, @@ -757,6 +765,168 @@ impl From for OnceCell { } } +#[cfg(test)] +mod test { + use super::*; + use alloc::sync::Arc; + use core::pin::pin; + + #[derive(Default)] + struct CountWaker(AtomicUsize); + impl alloc::task::Wake for CountWaker { + fn wake(self: Arc) { + self.0.fetch_add(1, Ordering::Relaxed); + } + } + + struct CmdWait<'a>(&'a AtomicUsize); + impl Future for CmdWait<'_> { + type Output = usize; + fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> task::Poll { + match self.0.load(Ordering::Relaxed) { + 0 => task::Poll::Pending, + n => task::Poll::Ready(n), + } + } + } + + async fn maybe(cmd: &AtomicUsize, cell: &OnceCell) -> Result { + cell.get_or_try_init(async { + match dbg!(CmdWait(cmd).await) { + 1 => Err(1), + 2 => Ok(2), + _ => unreachable!(), + } + }) + .await + .map(|v| *v) + } + + async fn never_init(cell: &OnceCell) { + let v = cell.get_or_init(async { unreachable!() }).await; + assert_eq!(v, &2); + } + + #[test] + fn slow_path() { + let w = Arc::new(CountWaker::default()).into(); + let mut cx = std::task::Context::from_waker(&w); + + let cmd = AtomicUsize::new(0); + let cell = OnceCell::new(); + + let mut f1 = pin!(maybe(&cmd, &cell)); + let mut f2 = pin!(never_init(&cell)); + + println!("{:?}", cell); + assert!(f1.as_mut().poll(&mut cx).is_pending()); + println!("{:?}", cell); + assert!(f2.as_mut().poll(&mut cx).is_pending()); + println!("{:?}", cell); + cmd.store(2, Ordering::Relaxed); + assert!(f2.as_mut().poll(&mut cx).is_pending()); + assert!(f1.as_mut().poll(&mut cx).is_ready()); + println!("{:?}", cell); + assert!(f2.as_mut().poll(&mut cx).is_ready()); + } + + #[test] + fn fast_path_tricked() { + // f1 will complete on the fast path, but a queue was created anyway + let w = Arc::new(CountWaker::default()).into(); + let mut cx = std::task::Context::from_waker(&w); + + let cmd = AtomicUsize::new(0); + let cell = OnceCell::new(); + + let mut f1 = pin!(maybe(&cmd, &cell)); + let mut f2 = pin!(never_init(&cell)); + + println!("{:?}", cell); + assert!(f1.as_mut().poll(&mut cx).is_pending()); + println!("{:?}", cell); + assert!(f2.as_mut().poll(&mut cx).is_pending()); + println!("{:?}", cell); + cmd.store(2, Ordering::Relaxed); + f2.set(never_init(&cell)); + println!("{:?}", cell); + assert!(f1.as_mut().poll(&mut cx).is_ready()); + println!("{:?}", cell); + assert!(f2.as_mut().poll(&mut cx).is_ready()); + } + + #[test] + fn second_try() { + let waker = Arc::new(CountWaker::default()); + let w = waker.clone().into(); + let mut cx = std::task::Context::from_waker(&w); + + let cmd = AtomicUsize::new(0); + let cell = OnceCell::new(); + + let mut f1 = pin!(maybe(&cmd, &cell)); + let mut f2 = pin!(maybe(&cmd, &cell)); + let mut f3 = pin!(maybe(&cmd, &cell)); + let mut f4 = pin!(maybe(&cmd, &cell)); + + assert!(f1.as_mut().poll(&mut cx).is_pending()); + assert_eq!(cell.inner.state.load(Ordering::Relaxed), QINIT_BIT); + assert!(f2.as_mut().poll(&mut cx).is_pending()); + assert!(f3.as_mut().poll(&mut cx).is_pending()); + assert!(f4.as_mut().poll(&mut cx).is_pending()); + assert_eq!(cell.inner.state.load(Ordering::Relaxed), QINIT_BIT | 3); + + cmd.store(1, Ordering::Relaxed); + // f2 should do nothing, as f1 holds QuickInitGuard + assert!(f2.as_mut().poll(&mut cx).is_pending()); + assert_eq!(waker.0.load(Ordering::Relaxed), 0); + + // f1 fails, as commanded + assert_eq!(f1.as_mut().poll(&mut cx), task::Poll::Ready(Err(1))); + // it released QINIT_BIT (and doesn't still hold a reference) + assert_eq!(cell.inner.state.load(Ordering::Relaxed), 3); + // f1 caused a wake to be sent (only one, as they have the same waker) + assert_eq!(waker.0.load(Ordering::Relaxed), 1); + + // drop one waiting task and check that the refcount drops + f4.set(maybe(&cmd, &cell)); + assert_eq!(cell.inner.state.load(Ordering::Relaxed), 2); + + // have f2 start init + cmd.store(0, Ordering::Relaxed); + assert!(f2.as_mut().poll(&mut cx).is_pending()); + + // allow f2 to actually complete init + cmd.store(2, Ordering::Relaxed); + + // f3 should add itself to the queue again, but not complete + assert!(f3.as_mut().poll(&mut cx).is_pending()); + assert_eq!(waker.0.load(Ordering::Relaxed), 1); + + assert_eq!(f2.as_mut().poll(&mut cx), task::Poll::Ready(Ok(2))); + + // Nobody else should run their closure + cmd.store(3, Ordering::Relaxed); + + // Other tasks can now immediately access the value + assert_eq!(f4.as_mut().poll(&mut cx), task::Poll::Ready(Ok(2))); + + // f3 is still waiting; the queue should not be freed yet, and it should have seen a wake + assert_eq!(waker.0.load(Ordering::Relaxed), 2); + assert_eq!(cell.inner.state.load(Ordering::Relaxed), READY_BIT | 1); + assert!(!cell.inner.queue.load(Ordering::Relaxed).is_null()); + + assert_eq!(f3.as_mut().poll(&mut cx), task::Poll::Ready(Ok(2))); + // the cell should be fully ready, with the queue deallocated + + assert_eq!(cell.inner.state.load(Ordering::Relaxed), READY_BIT); + assert!(cell.inner.queue.load(Ordering::Relaxed).is_null()); + + // no more wakes were sent + assert_eq!(waker.0.load(Ordering::Relaxed), 2); + } +} + enum LazyState { Running(F), Ready(T), @@ -768,10 +938,10 @@ enum LazyState { /// continued by other (concurrent or future) callers of [Lazy::get]. /// /// ``` -/// # async fn run() { /// use std::sync::Arc; /// use async_once_cell::Lazy; /// +/// # async fn run() { /// struct Data { /// id: u32, /// } @@ -782,6 +952,14 @@ enum LazyState { /// /// assert_eq!(shared.as_ref().get().await.id, 4); /// # } +/// # use std::future::Future; +/// # struct NeverWake; +/// # impl std::task::Wake for NeverWake { +/// # fn wake(self: Arc) {} +/// # } +/// # let w = Arc::new(NeverWake).into(); +/// # let mut cx = std::task::Context::from_waker(&w); +/// # assert!(std::pin::pin!(run()).poll(&mut cx).is_ready()); /// ``` pub struct Lazy { value: UnsafeCell>,