Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compat implementation #1119

Merged
merged 7 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ name = "futures_util"
[features]
std = ["futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "either/use_std", "slab"]
default = ["std", "futures-core-preview/either", "futures-sink-preview/either"]
compat = ["std", "futures"]
bench = []
nightly = []

Expand All @@ -29,6 +30,7 @@ futures-io-preview = { path = "../futures-io", version = "0.3.0-alpha.2", defaul
futures-sink-preview = { path = "../futures-sink", version = "0.3.0-alpha.2", default-features = false}
either = { version = "1.4", default-features = false }
slab = { version = "0.4", optional = true }
futures = { version = "0.1", optional = true }

[dev-dependencies]
futures-preview = { path = "../futures", version = "0.3.0-alpha.2" }
Expand Down
20 changes: 20 additions & 0 deletions futures-util/src/compat/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future`
/// and vice versa.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Compat<Fut, Ex> {
crate future: Fut,
crate executor: Option<Ex>,
}

impl<Fut, Ex> Compat<Fut, Ex> {
/// Returns the inner future.
pub fn into_inner(self) -> Fut {
self.future
}

/// Creates a new `Compat`.
crate fn new(future: Fut, executor: Option<Ex>) -> Compat<Fut, Ex> {
Compat { future, executor }
}
}
61 changes: 61 additions & 0 deletions futures-util/src/compat/compat01to03.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use super::Compat;
use futures::Async as Async01;
use futures::Future as Future01;
use futures::executor::{self as executor01, NotifyHandle as NotifyHandle01,
Notify as Notify01, UnsafeNotify as UnsafeNotify01};
use futures_core::Future as Future03;
use futures_core::task as task03;
use std::mem::PinMut;

impl<Fut: Future01> Future03 for Compat<Fut, ()> {
type Output = Result<Fut::Item, Fut::Error>;

fn poll(
self: PinMut<Self>,
cx: &mut task03::Context
) -> task03::Poll<Self::Output> {
let notify = &WakerToHandle(cx.waker());

executor01::with_notify(notify, 0, move || {
unsafe {
match PinMut::get_mut_unchecked(self).future.poll() {
Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)),
Ok(Async01::NotReady) => task03::Poll::Pending,
Err(e) => task03::Poll::Ready(Err(e)),
}
}
})
}
}

struct NotifyWaker(task03::Waker);

#[derive(Clone)]
struct WakerToHandle<'a>(&'a task03::Waker);

impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
let ptr = Box::new(NotifyWaker(handle.0.clone()));

unsafe {
NotifyHandle01::new(Box::into_raw(ptr))
}
}
}

impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}

unsafe impl UnsafeNotify01 for NotifyWaker {
unsafe fn clone_raw(&self) -> NotifyHandle01 {
WakerToHandle(&self.0).into()
}

unsafe fn drop_raw(&self) {
let ptr: *const dyn UnsafeNotify01 = self;
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
}
}
41 changes: 41 additions & 0 deletions futures-util/src/compat/compat03to01.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use super::Compat;
use futures::Future as Future01;
use futures::Poll as Poll01;
use futures::task as task01;
use futures::Async as Async01;
use futures_core::TryFuture as TryFuture03;
use futures_core::task as task03;
use std::marker::Unpin;
use std::mem::PinMut;
use std::sync::Arc;

impl<Fut, Ex> Future01 for Compat<Fut, Ex>
where Fut: TryFuture03 + Unpin,
Ex: task03::Executor
{
type Item = Fut::Ok;
type Error = Fut::Error;

fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
let waker = current_as_waker();
let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap());
match PinMut::new(&mut self.future).try_poll(&mut cx) {
task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)),
task03::Poll::Pending => Ok(Async01::NotReady),
task03::Poll::Ready(Err(e)) => Err(e),
}
}
}

fn current_as_waker() -> task03::LocalWaker {
let arc_waker = Arc::new(Current(task01::current()));
task03::local_waker_from_nonlocal(arc_waker)
}

struct Current(task01::Task);

impl task03::Wake for Current {
fn wake(arc_self: &Arc<Self>) {
arc_self.0.notify();
}
}
71 changes: 71 additions & 0 deletions futures-util/src/compat/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@

use super::Compat;
use crate::{TryFutureExt, FutureExt, future::UnitError};
use futures::future::Executor as Executor01;
use futures_core::task::Executor as Executor03;
use futures_core::task as task03;
use futures_core::future::FutureObj;

pub struct BoxedExecutor03(Box<dyn Executor03 + Send>);

impl Executor03 for BoxedExecutor03 {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), task03::SpawnObjError> {
(&mut *self.0).spawn_obj(future)
}
}

/// A future that can run on a futures 0.1 executor.
pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>, BoxedExecutor03>;

/// Extension trait for futures 0.1 Executors.
pub trait Executor01CompatExt: Executor01<Executor01Future> +
Clone + Send + 'static
{
/// Creates an `Executor` compatable with futures 0.3.
fn compat(self) -> Executor01As03<Self>
where Self: Sized;
}

impl<Ex> Executor01CompatExt for Ex
where Ex: Executor01<Executor01Future> + Clone + Send + 'static
{
fn compat(self) -> Executor01As03<Self> {
Executor01As03 {
executor01: self,
}
}
}

/// Converts a futures 0.1 `Executor` into a futures 0.3 `Executor`.
#[derive(Clone)]
pub struct Executor01As03<Ex> {
executor01: Ex
}

impl<Ex> Executor03 for Executor01As03<Ex>
where Ex: Executor01<Executor01Future>,
Ex: Clone + Send + 'static,
{
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), task03::SpawnObjError> {
let future = future.unit_error().compat(BoxedExecutor03(Box::new(self.clone())));

match self.executor01.execute(future) {
Ok(()) => Ok(()),
Err(err) => {
use futures_core::task::{SpawnObjError, SpawnErrorKind};

let fut = err.into_future().into_inner().unwrap_or_else(|_| ());
Err(SpawnObjError {
kind: SpawnErrorKind::shutdown(),
future: Box::new(fut).into(),
})
}
}
}
}
18 changes: 18 additions & 0 deletions futures-util/src/compat/future01ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use super::Compat;
use futures::Future as Future01;

impl<Fut: Future01> Future01CompatExt for Fut {}

/// Extension trait for futures 0.1 Futures.
pub trait Future01CompatExt: Future01 {
/// Converts a futures 0.1 `Future<Item = T, Error = E>` into a
/// futures 0.3 `Future<Output = Result<T, E>>`.
fn compat(self) -> Compat<Self, ()> where Self: Sized {
Compat {
future: self,
executor: None,
}
}
}


15 changes: 15 additions & 0 deletions futures-util/src/compat/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! Futures 0.1 / 0.3 shims

#![allow(missing_debug_implementations)]

mod executor;
pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03};

mod compat;
pub use self::compat::Compat;

mod compat01to03;
mod compat03to01;

mod future01ext;
pub use self::future01ext::Future01CompatExt;
20 changes: 20 additions & 0 deletions futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub use self::then::Then;
mod inspect;
pub use self::inspect::Inspect;

mod unit_error;
pub use self::unit_error::UnitError;

mod with_executor;
pub use self::with_executor::WithExecutor;

Expand All @@ -65,6 +68,8 @@ mod chain;
crate use self::chain::Chain;

if_std! {
use std::boxed::PinBox;

mod abortable;
pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted};

Expand Down Expand Up @@ -632,6 +637,21 @@ pub trait FutureExt: Future {
Shared::new(self)
}

/// Wrap the future in a Box, pinning it.
#[cfg(feature = "std")]
fn boxed(self) -> PinBox<Self>
where Self: Sized
{
PinBox::new(self)
}

/// Turns a `Future` into a `TryFuture` with `Error = ()`.
fn unit_error(self) -> UnitError<Self>
where Self: Sized
{
UnitError::new(self)
}

/// Assigns the provided `Executor` to be used when spawning tasks
/// from within the future.
///
Expand Down
34 changes: 34 additions & 0 deletions futures-util/src/future/unit_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use core::marker::Unpin;
use core::mem::PinMut;
use futures_core::future::Future;
use futures_core::task::{self, Poll};

/// Future for the `unit_error` combinator, turning a `Future` into a `TryFuture`.
///
/// This is created by the `FutureExt::unit_error` method.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct UnitError<Fut> {
future: Fut,
}

impl<Fut> UnitError<Fut> {
unsafe_pinned!(future: Fut);

/// Creates a new UnitError.
pub(super) fn new(future: Fut) -> UnitError<Fut> {
UnitError { future }
}
}

impl<Fut: Unpin> Unpin for UnitError<Fut> {}

impl<Fut, T> Future for UnitError<Fut>
where Fut: Future<Output = T>,
{
type Output = Result<T, ()>;

fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Result<T, ()>> {
self.future().poll(cx).map(Ok)
}
}
3 changes: 3 additions & 0 deletions futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub mod sink;

pub mod task;

#[cfg(feature = "compat")]
pub mod compat;

if_std! {
// FIXME: currently async/await is only available with std
pub mod async_await;
Expand Down
22 changes: 21 additions & 1 deletion futures-util/src/try_future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
use futures_core::future::TryFuture;
use futures_sink::Sink;

#[cfg(feature = "compat")]
use crate::compat::Compat;

#[cfg(feature = "compat")]
use futures_core::task::Executor;

#[cfg(feature = "compat")]
use core::marker::Unpin;

/* TODO
mod join;
mod select;
Expand Down Expand Up @@ -477,6 +486,17 @@ pub trait TryFutureExt: TryFuture {
UnwrapOrElse::new(self, f)
}

/// Wraps a [`TryFuture`] into a future compatable with libraries using
/// futures 0.1 future definitons. Requires the `compat` feature to enable.
///
#[cfg(feature = "compat")]
fn compat<E>(self, executor: E) -> Compat<Self, E>
where Self: Sized + Unpin,
E: Executor,
{
Compat::new(self, Some(executor))
}

/// Wraps a [`TryFuture`] into a type that implements
/// [`Future`](std::future::Future).
///
Expand All @@ -498,7 +518,7 @@ pub trait TryFutureExt: TryFuture {
/// fn take_future(future: impl Future<Output = Result<T, E>>) { /* ... */ }
///
/// take_future(make_try_future().into_future());
/// ```
/// ```
fn into_future(self) -> IntoFuture<Self>
where Self: Sized,
{
Expand Down
1 change: 1 addition & 0 deletions futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ futures-util-preview = { path = "../futures-util", version = "0.3.0-alpha.2", de
nightly = ["futures-util-preview/nightly"]
std = ["futures-core-preview/std", "futures-executor-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-util-preview/std"]
default = ["std"]
compat = ["std", "futures-util-preview/compat"]
Loading