Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support no_std with a self-referential stream #48

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions async-stream-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ pub fn stream_inner(input: TokenStream) -> TokenStream {
};

quote!({
let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
#crate_path::AsyncStream::new(__yield_rx, async move {
#crate_path::AsyncStream::new(move |mut __yield_tx| async move {
#dummy_yield
#(#stmts)*
})
Expand Down Expand Up @@ -173,8 +172,7 @@ pub fn try_stream_inner(input: TokenStream) -> TokenStream {
};

quote!({
let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
#crate_path::AsyncStream::new(__yield_rx, async move {
#crate_path::AsyncStream::new(move |mut __yield_tx| async move {
#dummy_yield
#(#stmts)*
})
Expand Down
1 change: 1 addition & 0 deletions async-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures-core = "0.3"

[dev-dependencies]
futures-util = "0.3"
futures-executor = "0.3.8"
tokio = { version = "1", features = ["full"] }
tokio-test = "0.4"
trybuild = "1"
235 changes: 198 additions & 37 deletions async-stream/src/async_stream.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,230 @@
use crate::yielder::Receiver;

use core::cell::{Cell, UnsafeCell};
use core::fmt::{self, Debug, Formatter};
use core::future::Future;
use core::marker::PhantomPinned;
use core::mem::{self, MaybeUninit};
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::{FusedStream, Stream};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

#[doc(hidden)]
#[derive(Debug)]
pub struct AsyncStream<T, U> {
rx: Receiver<T>,
pub struct AsyncStream<T, F, Fut> {
state: State<T, F, Fut>,
}

impl<T: Debug, F, Fut: Debug> Debug for AsyncStream<T, F, Fut> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("AsyncStream")
.field("state", &self.state)
.finish()
}
}

/// The state of the async stream.
enum State<T, F, Fut> {
/// The async stream has not yet been initialized. This holds the function that is used to
/// initialize it.
///
/// This state is necessary to allow the async stream to be soundly moved before `poll_next` is
/// called for the first time.
Uninit(F),

/// The async stream has been initialized.
///
/// This is an `UnsafeCell` to force the immutable borrow of its contents even when we have a
/// mutable reference to it so that our mutable reference to it doesn't alias with the inner
/// generator's reference to `Init::yielded`.
Init(UnsafeCell<Init<T, Fut>>),
}

impl<T: Debug, F, Fut: Debug> Debug for State<T, F, Fut> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Uninit(_) => f.pad("Uninit"),
Self::Init(init) => f
.debug_tuple("Init")
.field(unsafe { &*init.get() })
.finish(),
}
}
}

/// An initialized async stream.
struct Init<T, Fut> {
/// The last yielded item. The generator holds a pointer to this.
yielded: Cell<Option<T>>,

/// The generator itself. This is a `MaybeUninit` so that this type can be constructed with
/// partial initialization - through all regular usage this is initialized. It is an
/// `UnsafeCell` so we can get a mutable reference to it through the immutable reference
/// provided by the outer `UnsafeCell`.
generator: UnsafeCell<MaybeUninit<Fut>>,

/// Whether the generator is done.
done: bool,
generator: U,

/// As the generator holds a pointer to `yielded`, this type cannot move in memory.
_pinned: PhantomPinned,
}

impl<T: Debug, Fut: Debug> Debug for Init<T, Fut> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Init")
.field(
"generator",
// Safety: We only create a shared reference to the data, and the only time a
// mutable reference to this data is created is inside `poll_next` where we have a
// `&mut Self` anyway.
unsafe { &*(*self.generator.get()).as_ptr() },
)
.field("done", &self.done)
.finish()
}
}

impl<T, U> AsyncStream<T, U> {
impl<T, F, Fut> AsyncStream<T, F, Fut>
where
F: FnOnce(Sender<T>) -> Fut,
Fut: Future<Output = ()>,
{
#[doc(hidden)]
pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
pub fn new(f: F) -> AsyncStream<T, F, Fut> {
AsyncStream {
rx,
done: false,
generator,
state: State::Uninit(f),
}
}
}

impl<T, U> FusedStream for AsyncStream<T, U>
impl<T, F, Fut> FusedStream for AsyncStream<T, F, Fut>
where
U: Future<Output = ()>,
Self: Stream,
{
fn is_terminated(&self) -> bool {
self.done
match &self.state {
State::Uninit(_) => false,
State::Init(init) => {
// Safety: We never borrow this cell mutably from an immutable reference.
unsafe { &*init.get() }.done
}
}
}
}

impl<T, U> Stream for AsyncStream<T, U>
impl<T, F, Fut> Stream for AsyncStream<T, F, Fut>
where
U: Future<Output = ()>,
F: FnOnce(Sender<T>) -> Fut,
Fut: Future<Output = ()>,
{
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe {
let me = Pin::get_unchecked_mut(self);
let me = unsafe { Pin::into_inner_unchecked(self) };

if me.done {
return Poll::Ready(None);
let init_cell = match &mut me.state {
State::Uninit(_) => {
let old_state = mem::replace(
&mut me.state,
State::Init(UnsafeCell::new(Init {
yielded: Cell::new(None),
generator: UnsafeCell::new(MaybeUninit::uninit()),
done: false,
_pinned: PhantomPinned,
})),
);
let f = match old_state {
State::Uninit(f) => f,
_ => unreachable!(),
};
let init_cell = match &mut me.state {
State::Init(init) => init,
_ => unreachable!(),
};
let init = unsafe_cell_get_mut(init_cell);
let sender = Sender {
ptr: &init.yielded as *const _,
};
init.generator = UnsafeCell::new(MaybeUninit::new(f(sender)));
init_cell
}
State::Init(init) => {
if unsafe_cell_get_mut(init).done {
return Poll::Ready(None);
}
init
}
};

let mut dst = None;
let res = {
let _enter = me.rx.enter(&mut dst);
Pin::new_unchecked(&mut me.generator).poll(cx)
};
// Immutably borrow `init`. If we mutably borrowed `init` here it would cause UB as this
// mutable reference to `init.yielded` would alias with the generator's.
let init = unsafe { &*init_cell.get() };

me.done = res.is_ready();
let generator = unsafe { &mut *(*init.generator.get()).as_mut_ptr() };

if dst.is_some() {
return Poll::Ready(dst.take());
}
// Miri sometimes does not like this because of
// <https://github.com/rust-lang/rust/issues/63818>. However this is acceptable because the
// same unsoundness can be triggered with safe code:
// https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=43b4a4f3d7e9287c73821b27084fb179
// And so we know that once the safe code stops being UB (which will happen), this code will
// also stop being UB.
let res = unsafe { Pin::new_unchecked(generator) }.poll(cx);

if me.done {
Poll::Ready(None)
} else {
Poll::Pending
}
// Now that the generator no longer will use its pointer to `init.yielded`, we can create a
// mutable reference.
let init = unsafe_cell_get_mut(init_cell);

if res.is_ready() {
init.done = true;
}

if let Some(yielded) = init.yielded.take() {
return Poll::Ready(Some(yielded));
}

match res {
Poll::Ready(()) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

pub struct Sender<T> {
ptr: *const Cell<Option<T>>,
}

impl<T> Sender<T> {
pub fn send(&mut self, value: T) -> impl Future<Output = ()> {
unsafe { &*self.ptr }.set(Some(value));

SendFut { yielded: false }
}
}

impl<T> Debug for Sender<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.pad("Sender")
}
}

// Sender alone wouldn't be Send, however since we know it is only ever inside the generator if it
// is sent to another thread the AsyncStream it is inside will be too.
unsafe impl<T> Send for Sender<T> {}

struct SendFut {
yielded: bool,
}
impl Future for SendFut {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
Poll::Pending
}
}
}

/// A helper function to reduce usages of `unsafe`.
fn unsafe_cell_get_mut<T: ?Sized>(cell: &mut UnsafeCell<T>) -> &mut T {
unsafe { &mut *cell.get() }
}
3 changes: 1 addition & 2 deletions async-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
unreachable_pub
)]
#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
#![no_std]

//! Asynchronous stream of elements.
//!
Expand Down Expand Up @@ -159,8 +160,6 @@

mod async_stream;
mod next;
#[doc(hidden)]
pub mod yielder;

// Used by the macro, but not intended to be accessed publicly.
#[doc(hidden)]
Expand Down
6 changes: 3 additions & 3 deletions async-stream/src/next.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// This is equivalent to the `futures::StreamExt::next` method.
// But we want to make this crate dependency as small as possible, so we define our `next` function.
Expand Down
Loading