Skip to content

Commit

Permalink
Add IntoFuture impls for Lazy
Browse files Browse the repository at this point in the history
This uses concrete types in order to work on stable rust.  The resulting
Future objects ended up being smaller, which is an unintentional benefit.
  • Loading branch information
danieldg committed May 12, 2023
1 parent 81ced05 commit 8f7d014
Showing 1 changed file with 181 additions and 66 deletions.
247 changes: 181 additions & 66 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ use core::{
cell::UnsafeCell,
convert::Infallible,
fmt,
future::Future,
marker::PhantomData,
mem::{ManuallyDrop, MaybeUninit},
future::{Future, IntoFuture},
marker::{PhantomData, PhantomPinned},
mem::{self, ManuallyDrop, MaybeUninit},
panic::{RefUnwindSafe, UnwindSafe},
pin::Pin,
ptr,
Expand Down Expand Up @@ -980,7 +980,7 @@ union LazyState<T, F> {
/// Data { id: 4 }
/// }));
///
/// assert_eq!(shared.as_ref().get().await.id, 4);
/// assert_eq!(shared.as_ref().await.id, 4);
/// # }
/// # use std::future::Future;
/// # struct NeverWake;
Expand Down Expand Up @@ -1014,111 +1014,226 @@ where

/// Forces the evaluation of this lazy value and returns a reference to the result.
///
/// This is equivalent to calling `.await` on a pinned reference, but is more explicit.
///
/// The [Pin::static_ref] function may be useful if this is a static value.
pub async fn get(self: Pin<&Self>) -> Pin<&T> {
let state = self.inner.state.load(Ordering::Acquire);
self.await
}
}

if state & READY_BIT == 0 {
self.init_slow(state == NEW).await;
}
enum Step<'a> {
Start,
Quick { guard: QuickInitGuard<'a> },
Wait { guard: QueueWaiter<'a> },
Run { head: QueueHead<'a> },
}

// Safety: initialized on all paths, and pinned like self
unsafe { Pin::new_unchecked(&(*self.value.get()).ready) }
}
/// A helper struct for both of [Lazy]'s [IntoFuture]s
///
/// Note: the Lazy value may or may not be pinned, depending on what public struct wraps this one.
struct LazyFuture<'a, T, F> {
lazy: &'a Lazy<T, F>,
step: Step<'a>,
// This is needed to guarantee Inner's refcount never overflows
_pin: PhantomPinned,
}

#[cold]
async fn init_slow(self: Pin<&Self>, try_quick: bool) {
match self.inner.initialize(try_quick) {
Err(guard) => {
struct QuickReadyGuard<'a, T, F> {
this: &'a Lazy<T, F>,
value: ManuallyDrop<T>,
guard: QuickInitGuard<'a>,
}
impl<'a, T, F> LazyFuture<'a, T, F>
where
F: Future<Output = T>,
{
fn poll(&mut self, cx: &mut task::Context<'_>) -> task::Poll<&'a T> {
struct QuickReadyGuard<'a, T, F> {
this: &'a Lazy<T, F>,
value: ManuallyDrop<T>,
guard: QuickInitGuard<'a>,
}

// Prevent double-drop in case of panic in ManuallyDrop::drop
impl<T, F> Drop for QuickReadyGuard<'_, T, F> {
fn drop(&mut self) {
// Safety: the union is currently empty and must be filled with a ready value
unsafe {
let value = ManuallyDrop::take(&mut self.value);
(*self.this.value.get()).ready = ManuallyDrop::new(value);
}
self.guard.ready = true;
}
// Prevent double-drop in case of panic in ManuallyDrop::drop
impl<T, F> Drop for QuickReadyGuard<'_, T, F> {
fn drop(&mut self) {
// Safety: the union is currently empty and must be filled with a ready value
unsafe {
let value = ManuallyDrop::take(&mut self.value);
(*self.this.value.get()).ready = ManuallyDrop::new(value);
}
self.guard.ready = true;
}
}

// Safety: the union is in the running state and is pinned like self
let init = unsafe { Pin::new_unchecked(&mut *(*self.value.get()).running) };
let value = init.await;
// Safety: the guard acts like QueueHead even if there is contention.
// This transitions the union to ready and updates state to reflect that.
struct ReadyGuard<'a, T, F> {
this: &'a Lazy<T, F>,
value: ManuallyDrop<T>,
// head is a field here to ensure it is dropped after our Drop
head: QueueHead<'a>,
}

// Prevent double-drop in case of panic in ManuallyDrop::drop
impl<T, F> Drop for ReadyGuard<'_, T, F> {
fn drop(&mut self) {
// Safety: the union is currently empty and must be filled with a ready value
unsafe {
let guard =
QuickReadyGuard { this: &*self, value: ManuallyDrop::new(value), guard };
ManuallyDrop::drop(&mut (*self.value.get()).running);
drop(guard);
let value = ManuallyDrop::take(&mut self.value);
(*self.this.value.get()).ready = ManuallyDrop::new(value);
}
self.head.guard.inner.set_ready();
}
Ok(guard) => {
struct ReadyGuard<'a, T, F> {
this: &'a Lazy<T, F>,
value: ManuallyDrop<T>,
// head is a field here to ensure it is dropped after our Drop
head: QueueHead<'a>,
}
}

// Prevent double-drop in case of panic in ManuallyDrop::drop
impl<T, F> Drop for ReadyGuard<'_, T, F> {
fn drop(&mut self) {
// Safety: the union is currently empty and must be filled with a ready value
unsafe {
let value = ManuallyDrop::take(&mut self.value);
(*self.this.value.get()).ready = ManuallyDrop::new(value);
}
self.head.guard.inner.set_ready();
loop {
match mem::replace(&mut self.step, Step::Start) {
Step::Start => {
let state = self.lazy.inner.state.load(Ordering::Acquire);

if state & READY_BIT == 0 {
self.step = match self.lazy.inner.initialize(state == NEW) {
Err(guard) => Step::Quick { guard },
Ok(guard) => Step::Wait { guard },
};
continue;
}

// Safety: we just saw READY_BIT set
return task::Poll::Ready(unsafe { &(*self.lazy.value.get()).ready });
}
Step::Quick { guard } => {
// Safety: the union is in the running state and is pinned like self
let init =
unsafe { Pin::new_unchecked(&mut *(*self.lazy.value.get()).running) };
let value = match init.poll(cx) {
task::Poll::Pending => {
self.step = Step::Quick { guard };
return task::Poll::Pending;
}
task::Poll::Ready(value) => ManuallyDrop::new(value),
};
// Safety: the guard acts like QueueHead even if there is contention.
// This transitions the union to ready and updates state to reflect that.
unsafe {
let guard = QuickReadyGuard { this: &self.lazy, value, guard };
ManuallyDrop::drop(&mut (*self.lazy.value.get()).running);
drop(guard);
}

if let Some(head) = guard.await {
// Safety: just initialized
return task::Poll::Ready(unsafe { &(*self.lazy.value.get()).ready });
}
Step::Wait { mut guard } => match Pin::new(&mut guard).poll(cx) {
task::Poll::Pending => {
self.step = Step::Wait { guard };
return task::Poll::Pending;
}
task::Poll::Ready(None) => {
// Safety: getting None from QueueWaiter means it is ready
return task::Poll::Ready(unsafe { &(*self.lazy.value.get()).ready });
}
task::Poll::Ready(Some(head)) => {
self.step = Step::Run { head };
continue;
}
},
Step::Run { head } => {
// Safety: the union is in the running state and is pinned like self
let init = unsafe { Pin::new_unchecked(&mut *(*self.value.get()).running) };
let init =
unsafe { Pin::new_unchecked(&mut *(*self.lazy.value.get()).running) };
// We hold the QueueHead, so we know that nobody else has successfully run an init
// poll and that nobody else can start until it is dropped. On error, panic, or
// drop of this Future, the head will be passed to another waiter.
let value = init.await;
let value = match init.poll(cx) {
task::Poll::Pending => {
self.step = Step::Run { head };
return task::Poll::Pending;
}
task::Poll::Ready(value) => ManuallyDrop::new(value),
};

// Safety: We still hold the head, so nobody else can write to value
// This transitions the union to ready and updates state to reflect that.
unsafe {
let head =
ReadyGuard { this: &*self, value: ManuallyDrop::new(value), head };
ManuallyDrop::drop(&mut (*self.value.get()).running);
let head = ReadyGuard { this: &self.lazy, value, head };
ManuallyDrop::drop(&mut (*self.lazy.value.get()).running);

// mark the cell ready before giving up the head
drop(head);
}
// drop of QueueHead notifies other Futures
// drop of QueueRef (might) free the Queue
} else {
// someone initialized it while waiting on the queue

// Safety: just initialized
return task::Poll::Ready(unsafe { &(*self.lazy.value.get()).ready });
}
}
}
}
}

/// A helper struct for [Lazy]'s [IntoFuture]
pub struct LazyFuturePin<'a, T, F>(LazyFuture<'a, T, F>);

impl<'a, T, F> IntoFuture for Pin<&'a Lazy<T, F>>
where
F: Future<Output = T>,
{
type Output = Pin<&'a T>;
type IntoFuture = LazyFuturePin<'a, T, F>;
fn into_future(self) -> Self::IntoFuture {
// Safety: this is Pin::deref, but with a lifetime of 'a
let lazy = unsafe { Pin::into_inner_unchecked(self) };
LazyFuturePin(LazyFuture { lazy, step: Step::Start, _pin: PhantomPinned })
}
}

impl<'a, T, F> Future for LazyFuturePin<'a, T, F>
where
F: Future<Output = T>,
{
type Output = Pin<&'a T>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Pin<&'a T>> {
// Safety: we don't move anything that needs to be pinned.
let inner = unsafe { &mut Pin::into_inner_unchecked(self).0 };
// Safety: because the original Lazy was pinned, the T it produces is also pinned
inner.poll(cx).map(|p| unsafe { Pin::new_unchecked(p) })
}
}

impl<T, F> Lazy<T, F>
where
F: Future<Output = T> + Unpin,
{
/// Forces the evaluation of this lazy value and returns a reference to the result.
///
/// This is equivalent to calling `.await` on a reference, but may be clearer to call
/// explicitly.
///
/// Unlike [Self::get], this does not require pinning the object.
pub async fn get_unpin(&self) -> &T {
// The get() function itself does not use the fact that T is pinned, and Pin::deref already
// exposes a &T from Pin<&T> (although not with the right lifetime).
unsafe { Pin::into_inner_unchecked(Pin::new_unchecked(self).get().await) }
self.await
}
}

/// A helper struct for [Lazy]'s [IntoFuture]
pub struct LazyFutureUnpin<'a, T, F>(LazyFuture<'a, T, F>);

impl<'a, T, F> IntoFuture for &'a Lazy<T, F>
where
F: Future<Output = T> + Unpin,
{
type Output = &'a T;
type IntoFuture = LazyFutureUnpin<'a, T, F>;
fn into_future(self) -> Self::IntoFuture {
LazyFutureUnpin(LazyFuture { lazy: self, step: Step::Start, _pin: PhantomPinned })
}
}

impl<'a, T, F> Future for LazyFutureUnpin<'a, T, F>
where
F: Future<Output = T> + Unpin,
{
type Output = &'a T;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<&'a T> {
// Safety: we don't move anything that needs to be pinned.
unsafe { Pin::into_inner_unchecked(self) }.0.poll(cx)
}
}

Expand Down

0 comments on commit 8f7d014

Please sign in to comment.