From 8f7d0144bbfac0588afeeeba55f297b6d104c41b Mon Sep 17 00:00:00 2001 From: Daniel De Graaf Date: Fri, 12 May 2023 19:32:38 -0400 Subject: [PATCH] Add IntoFuture impls for Lazy This uses concrete types in order to work on stable rust. The resulting Future objects ended up being smaller, which is an unintentional benefit. --- src/lib.rs | 247 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 181 insertions(+), 66 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index aa9bea1..932b8b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -99,9 +99,9 @@ use core::{ cell::UnsafeCell, convert::Infallible, fmt, - future::Future, - marker::PhantomData, - mem::{ManuallyDrop, MaybeUninit}, + future::{Future, IntoFuture}, + marker::{PhantomData, PhantomPinned}, + mem::{self, ManuallyDrop, MaybeUninit}, panic::{RefUnwindSafe, UnwindSafe}, pin::Pin, ptr, @@ -980,7 +980,7 @@ union LazyState { /// Data { id: 4 } /// })); /// -/// assert_eq!(shared.as_ref().get().await.id, 4); +/// assert_eq!(shared.as_ref().await.id, 4); /// # } /// # use std::future::Future; /// # struct NeverWake; @@ -1014,111 +1014,226 @@ where /// Forces the evaluation of this lazy value and returns a reference to the result. /// + /// This is equivalent to calling `.await` on a pinned reference, but is more explicit. + /// /// The [Pin::static_ref] function may be useful if this is a static value. pub async fn get(self: Pin<&Self>) -> Pin<&T> { - let state = self.inner.state.load(Ordering::Acquire); + self.await + } +} - if state & READY_BIT == 0 { - self.init_slow(state == NEW).await; - } +enum Step<'a> { + Start, + Quick { guard: QuickInitGuard<'a> }, + Wait { guard: QueueWaiter<'a> }, + Run { head: QueueHead<'a> }, +} - // Safety: initialized on all paths, and pinned like self - unsafe { Pin::new_unchecked(&(*self.value.get()).ready) } - } +/// A helper struct for both of [Lazy]'s [IntoFuture]s +/// +/// Note: the Lazy value may or may not be pinned, depending on what public struct wraps this one. +struct LazyFuture<'a, T, F> { + lazy: &'a Lazy, + step: Step<'a>, + // This is needed to guarantee Inner's refcount never overflows + _pin: PhantomPinned, +} - #[cold] - async fn init_slow(self: Pin<&Self>, try_quick: bool) { - match self.inner.initialize(try_quick) { - Err(guard) => { - struct QuickReadyGuard<'a, T, F> { - this: &'a Lazy, - value: ManuallyDrop, - guard: QuickInitGuard<'a>, - } +impl<'a, T, F> LazyFuture<'a, T, F> +where + F: Future, +{ + fn poll(&mut self, cx: &mut task::Context<'_>) -> task::Poll<&'a T> { + struct QuickReadyGuard<'a, T, F> { + this: &'a Lazy, + value: ManuallyDrop, + guard: QuickInitGuard<'a>, + } - // Prevent double-drop in case of panic in ManuallyDrop::drop - impl Drop for QuickReadyGuard<'_, T, F> { - fn drop(&mut self) { - // Safety: the union is currently empty and must be filled with a ready value - unsafe { - let value = ManuallyDrop::take(&mut self.value); - (*self.this.value.get()).ready = ManuallyDrop::new(value); - } - self.guard.ready = true; - } + // Prevent double-drop in case of panic in ManuallyDrop::drop + impl Drop for QuickReadyGuard<'_, T, F> { + fn drop(&mut self) { + // Safety: the union is currently empty and must be filled with a ready value + unsafe { + let value = ManuallyDrop::take(&mut self.value); + (*self.this.value.get()).ready = ManuallyDrop::new(value); } + self.guard.ready = true; + } + } - // Safety: the union is in the running state and is pinned like self - let init = unsafe { Pin::new_unchecked(&mut *(*self.value.get()).running) }; - let value = init.await; - // Safety: the guard acts like QueueHead even if there is contention. - // This transitions the union to ready and updates state to reflect that. + struct ReadyGuard<'a, T, F> { + this: &'a Lazy, + value: ManuallyDrop, + // head is a field here to ensure it is dropped after our Drop + head: QueueHead<'a>, + } + + // Prevent double-drop in case of panic in ManuallyDrop::drop + impl Drop for ReadyGuard<'_, T, F> { + fn drop(&mut self) { + // Safety: the union is currently empty and must be filled with a ready value unsafe { - let guard = - QuickReadyGuard { this: &*self, value: ManuallyDrop::new(value), guard }; - ManuallyDrop::drop(&mut (*self.value.get()).running); - drop(guard); + let value = ManuallyDrop::take(&mut self.value); + (*self.this.value.get()).ready = ManuallyDrop::new(value); } + self.head.guard.inner.set_ready(); } - Ok(guard) => { - struct ReadyGuard<'a, T, F> { - this: &'a Lazy, - value: ManuallyDrop, - // head is a field here to ensure it is dropped after our Drop - head: QueueHead<'a>, - } + } - // Prevent double-drop in case of panic in ManuallyDrop::drop - impl Drop for ReadyGuard<'_, T, F> { - fn drop(&mut self) { - // Safety: the union is currently empty and must be filled with a ready value - unsafe { - let value = ManuallyDrop::take(&mut self.value); - (*self.this.value.get()).ready = ManuallyDrop::new(value); - } - self.head.guard.inner.set_ready(); + loop { + match mem::replace(&mut self.step, Step::Start) { + Step::Start => { + let state = self.lazy.inner.state.load(Ordering::Acquire); + + if state & READY_BIT == 0 { + self.step = match self.lazy.inner.initialize(state == NEW) { + Err(guard) => Step::Quick { guard }, + Ok(guard) => Step::Wait { guard }, + }; + continue; } + + // Safety: we just saw READY_BIT set + return task::Poll::Ready(unsafe { &(*self.lazy.value.get()).ready }); } + Step::Quick { guard } => { + // Safety: the union is in the running state and is pinned like self + let init = + unsafe { Pin::new_unchecked(&mut *(*self.lazy.value.get()).running) }; + let value = match init.poll(cx) { + task::Poll::Pending => { + self.step = Step::Quick { guard }; + return task::Poll::Pending; + } + task::Poll::Ready(value) => ManuallyDrop::new(value), + }; + // Safety: the guard acts like QueueHead even if there is contention. + // This transitions the union to ready and updates state to reflect that. + unsafe { + let guard = QuickReadyGuard { this: &self.lazy, value, guard }; + ManuallyDrop::drop(&mut (*self.lazy.value.get()).running); + drop(guard); + } - if let Some(head) = guard.await { + // Safety: just initialized + return task::Poll::Ready(unsafe { &(*self.lazy.value.get()).ready }); + } + Step::Wait { mut guard } => match Pin::new(&mut guard).poll(cx) { + task::Poll::Pending => { + self.step = Step::Wait { guard }; + return task::Poll::Pending; + } + task::Poll::Ready(None) => { + // Safety: getting None from QueueWaiter means it is ready + return task::Poll::Ready(unsafe { &(*self.lazy.value.get()).ready }); + } + task::Poll::Ready(Some(head)) => { + self.step = Step::Run { head }; + continue; + } + }, + Step::Run { head } => { // Safety: the union is in the running state and is pinned like self - let init = unsafe { Pin::new_unchecked(&mut *(*self.value.get()).running) }; + let init = + unsafe { Pin::new_unchecked(&mut *(*self.lazy.value.get()).running) }; // We hold the QueueHead, so we know that nobody else has successfully run an init // poll and that nobody else can start until it is dropped. On error, panic, or // drop of this Future, the head will be passed to another waiter. - let value = init.await; + let value = match init.poll(cx) { + task::Poll::Pending => { + self.step = Step::Run { head }; + return task::Poll::Pending; + } + task::Poll::Ready(value) => ManuallyDrop::new(value), + }; // Safety: We still hold the head, so nobody else can write to value // This transitions the union to ready and updates state to reflect that. unsafe { - let head = - ReadyGuard { this: &*self, value: ManuallyDrop::new(value), head }; - ManuallyDrop::drop(&mut (*self.value.get()).running); + let head = ReadyGuard { this: &self.lazy, value, head }; + ManuallyDrop::drop(&mut (*self.lazy.value.get()).running); // mark the cell ready before giving up the head drop(head); } // drop of QueueHead notifies other Futures // drop of QueueRef (might) free the Queue - } else { - // someone initialized it while waiting on the queue + + // Safety: just initialized + return task::Poll::Ready(unsafe { &(*self.lazy.value.get()).ready }); } } } } } +/// A helper struct for [Lazy]'s [IntoFuture] +pub struct LazyFuturePin<'a, T, F>(LazyFuture<'a, T, F>); + +impl<'a, T, F> IntoFuture for Pin<&'a Lazy> +where + F: Future, +{ + type Output = Pin<&'a T>; + type IntoFuture = LazyFuturePin<'a, T, F>; + fn into_future(self) -> Self::IntoFuture { + // Safety: this is Pin::deref, but with a lifetime of 'a + let lazy = unsafe { Pin::into_inner_unchecked(self) }; + LazyFuturePin(LazyFuture { lazy, step: Step::Start, _pin: PhantomPinned }) + } +} + +impl<'a, T, F> Future for LazyFuturePin<'a, T, F> +where + F: Future, +{ + type Output = Pin<&'a T>; + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll> { + // Safety: we don't move anything that needs to be pinned. + let inner = unsafe { &mut Pin::into_inner_unchecked(self).0 }; + // Safety: because the original Lazy was pinned, the T it produces is also pinned + inner.poll(cx).map(|p| unsafe { Pin::new_unchecked(p) }) + } +} + impl Lazy where F: Future + Unpin, { /// Forces the evaluation of this lazy value and returns a reference to the result. /// + /// This is equivalent to calling `.await` on a reference, but may be clearer to call + /// explicitly. + /// /// Unlike [Self::get], this does not require pinning the object. pub async fn get_unpin(&self) -> &T { - // The get() function itself does not use the fact that T is pinned, and Pin::deref already - // exposes a &T from Pin<&T> (although not with the right lifetime). - unsafe { Pin::into_inner_unchecked(Pin::new_unchecked(self).get().await) } + self.await + } +} + +/// A helper struct for [Lazy]'s [IntoFuture] +pub struct LazyFutureUnpin<'a, T, F>(LazyFuture<'a, T, F>); + +impl<'a, T, F> IntoFuture for &'a Lazy +where + F: Future + Unpin, +{ + type Output = &'a T; + type IntoFuture = LazyFutureUnpin<'a, T, F>; + fn into_future(self) -> Self::IntoFuture { + LazyFutureUnpin(LazyFuture { lazy: self, step: Step::Start, _pin: PhantomPinned }) + } +} + +impl<'a, T, F> Future for LazyFutureUnpin<'a, T, F> +where + F: Future + Unpin, +{ + type Output = &'a T; + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<&'a T> { + // Safety: we don't move anything that needs to be pinned. + unsafe { Pin::into_inner_unchecked(self) }.0.poll(cx) } }