Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use pin-project instead of unsafe_[un]pinned #1616

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ members = [
"futures-util",
"futures-test",
]

[patch.crates-io]
pin-project = { git = "https://github.com/taiki-e/pin-project", branch = "pin_project" }
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ slab = { version = "0.4", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-project = "0.3.2"
pin-utils = "0.1.0-alpha.4"

[dev-dependencies]
Expand Down
11 changes: 6 additions & 5 deletions futures-util/src/future/catch_unwind.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};
use std::any::Any;
use std::pin::Pin;
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
use std::pin::Pin;

/// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CatchUnwind<Fut> where Fut: Future {
#[pin]
future: Fut,
}

impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe {
unsafe_pinned!(future: Fut);

pub(super) fn new(future: Fut) -> CatchUnwind<Fut> {
CatchUnwind { future }
}
Expand All @@ -25,7 +25,8 @@ impl<Fut> Future for CatchUnwind<Fut>
{
type Output = Result<Fut::Output, Box<dyn Any + Send>>;

#[pin_project(self)]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
catch_unwind(AssertUnwindSafe(|| self.future.poll(cx)))?.map(Ok)
}
}
9 changes: 5 additions & 4 deletions futures-util/src/future/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Future for the [`flatten`](super::FutureExt::flatten) method.
#[unsafe_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Flatten<Fut>
where Fut: Future,
Fut::Output: Future,
{
#[pin]
state: Chain<Fut, Fut::Output, ()>,
}

impl<Fut> Flatten<Fut>
where Fut: Future,
Fut::Output: Future,
{
unsafe_pinned!(state: Chain<Fut, Fut::Output, ()>);

pub(super) fn new(future: Fut) -> Flatten<Fut> {
Flatten {
state: Chain::new(future, ()),
Expand Down Expand Up @@ -51,7 +51,8 @@ impl<Fut> Future for Flatten<Fut>
{
type Output = <Fut::Output as Future>::Output;

#[pin_project(self)]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.state().poll(cx, |a, ()| a)
self.state.poll(cx, |a, ()| a)
}
}
11 changes: 6 additions & 5 deletions futures-util/src/future/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Future for the [`fuse`](super::FutureExt::fuse) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Fuse<Fut: Future> {
#[pin]
future: Option<Fut>,
}

impl<Fut: Future> Fuse<Fut> {
unsafe_pinned!(future: Option<Fut>);

pub(super) fn new(f: Fut) -> Fuse<Fut> {
Fuse {
future: Some(f),
Expand Down Expand Up @@ -79,13 +79,14 @@ impl<Fut: Future> FusedFuture for Fuse<Fut> {
impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;

#[pin_project(self)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
let v = match self.as_mut().future().as_pin_mut() {
let v = match self.future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Pending,
};

self.as_mut().future().set(None);
self.future.set(None);
Poll::Ready(v)
}
}
14 changes: 6 additions & 8 deletions futures-util/src/future/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use pin_project::{pin_project, unsafe_project};

/// Future for the [`inspect`](super::FutureExt::inspect) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Inspect<Fut, F> where Fut: Future {
#[pin]
future: Fut,
f: Option<F>,
}

impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(f: Option<F>);

pub(super) fn new(future: Fut, f: F) -> Inspect<Fut, F> {
Inspect {
future,
Expand All @@ -23,8 +22,6 @@ impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> {
}
}

impl<Fut: Future + Unpin, F> Unpin for Inspect<Fut, F> {}

impl<Fut: Future + FusedFuture, F> FusedFuture for Inspect<Fut, F> {
fn is_terminated(&self) -> bool { self.future.is_terminated() }
}
Expand All @@ -35,9 +32,10 @@ impl<Fut, F> Future for Inspect<Fut, F>
{
type Output = Fut::Output;

#[pin_project(self)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
let e = ready!(self.as_mut().future().poll(cx));
let f = self.as_mut().f().take().expect("cannot poll Inspect twice");
let e = ready!(self.future.as_mut().poll(cx));
let f = self.f.take().expect("cannot poll Inspect twice");
f(&e);
Poll::Ready(e)
}
Expand Down
11 changes: 6 additions & 5 deletions futures-util/src/future/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Stream for the [`into_stream`](super::FutureExt::into_stream) method.
#[unsafe_project(Unpin)]
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct IntoStream<Fut: Future> {
#[pin]
future: Option<Fut>
}

impl<Fut: Future> IntoStream<Fut> {
unsafe_pinned!(future: Option<Fut>);

pub(super) fn new(future: Fut) -> IntoStream<Fut> {
IntoStream {
future: Some(future)
Expand All @@ -24,13 +24,14 @@ impl<Fut: Future> IntoStream<Fut> {
impl<Fut: Future> Stream for IntoStream<Fut> {
type Item = Fut::Output;

#[pin_project(self)]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let v = match self.as_mut().future().as_pin_mut() {
let v = match self.future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};

self.as_mut().future().set(None);
self.future.set(None);
Poll::Ready(Some(v))
}
}
17 changes: 7 additions & 10 deletions futures-util/src/future/map.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use pin_project::{pin_project, unsafe_project};

/// Future for the [`map`](super::FutureExt::map) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Map<Fut, F> {
#[pin]
future: Fut,
f: Option<F>,
}

impl<Fut, F> Map<Fut, F> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(f: Option<F>);

/// Creates a new Map.
pub(super) fn new(future: Fut, f: F) -> Map<Fut, F> {
Map { future, f: Some(f) }
}
}

impl<Fut: Unpin, F> Unpin for Map<Fut, F> {}

impl<Fut, F> FusedFuture for Map<Fut, F> {
fn is_terminated(&self) -> bool { self.f.is_none() }
}
Expand All @@ -33,12 +29,13 @@ impl<Fut, F, T> Future for Map<Fut, F>
{
type Output = T;

#[pin_project(self)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
self.as_mut()
.future()
self.future
.as_mut()
.poll(cx)
.map(|output| {
let f = self.f().take()
let f = self.f.take()
.expect("Map must not be polled after it returned `Poll::Ready`");
f(output)
})
Expand Down
11 changes: 5 additions & 6 deletions futures-util/src/future/never_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{self, Poll};
use futures_core::never::Never;
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Future for the [`never_error`](super::FutureExt::never_error) combinator.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct NeverError<Fut> {
#[pin]
future: Fut,
}

impl<Fut> NeverError<Fut> {
unsafe_pinned!(future: Fut);

pub(super) fn new(future: Fut) -> NeverError<Fut> {
NeverError { future }
}
}

impl<Fut: Unpin> Unpin for NeverError<Fut> {}

impl<Fut: FusedFuture> FusedFuture for NeverError<Fut> {
fn is_terminated(&self) -> bool { self.future.is_terminated() }
}
Expand All @@ -30,7 +28,8 @@ impl<Fut, T> Future for NeverError<Fut>
{
type Output = Result<T, Never>;

#[pin_project(self)]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.future().poll(cx).map(Ok)
self.future.poll(cx).map(Ok)
}
}
11 changes: 5 additions & 6 deletions futures-util/src/future/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// A future representing a value which may or may not be present.
///
Expand All @@ -23,24 +23,23 @@ use pin_utils::unsafe_pinned;
/// assert_eq!(a.await, None);
/// # });
/// ```
#[unsafe_project(Unpin)]
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct OptionFuture<F> {
#[pin]
option: Option<F>,
}

impl<F> OptionFuture<F> {
unsafe_pinned!(option: Option<F>);
}

impl<F: Future> Future for OptionFuture<F> {
type Output = Option<F::Output>;

#[pin_project(self)]
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
match self.option().as_pin_mut() {
match self.option.as_pin_mut() {
Some(x) => x.poll(cx).map(Some),
None => Poll::Ready(None),
}
Expand Down
18 changes: 7 additions & 11 deletions futures-util/src/future/remote_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
future::Future,
task::{Context, Poll},
},
pin_utils::{unsafe_pinned, unsafe_unpinned},
pin_project::{pin_project, unsafe_project},
std::{
any::Any,
fmt,
Expand Down Expand Up @@ -54,10 +54,12 @@ type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'stati

/// A future which sends its output to the corresponding `RemoteHandle`.
/// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
#[unsafe_project(Unpin)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Remote<Fut: Future> {
tx: Option<Sender<SendMsg<Fut>>>,
keep_running: Arc<AtomicBool>,
#[pin]
future: CatchUnwind<AssertUnwindSafe<Fut>>,
}

Expand All @@ -69,29 +71,23 @@ impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
}
}

impl<Fut: Future + Unpin> Unpin for Remote<Fut> {}

impl<Fut: Future> Remote<Fut> {
unsafe_pinned!(future: CatchUnwind<AssertUnwindSafe<Fut>>);
unsafe_unpinned!(tx: Option<Sender<SendMsg<Fut>>>);
}

impl<Fut: Future> Future for Remote<Fut> {
type Output = ();

#[pin_project(self)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(_) = self.as_mut().tx().as_mut().unwrap().poll_cancel(cx) {
if let Poll::Ready(_) = self.tx.as_mut().unwrap().poll_cancel(cx) {
if !self.keep_running.load(Ordering::SeqCst) {
// Cancelled, bail out
return Poll::Ready(())
}
}

let output = ready!(self.as_mut().future().poll(cx));
let output = ready!(self.future.as_mut().poll(cx));

// if the receiving end has gone away then that's ok, we just ignore the
// send error here.
drop(self.as_mut().tx().take().unwrap().send(output));
drop(self.tx.take().unwrap().send(output));
Poll::Ready(())
}
}
Expand Down
Loading