diff --git a/futures-util/src/compat/compat.rs b/futures-util/src/compat/compat.rs new file mode 100644 index 0000000000..77943837fc --- /dev/null +++ b/futures-util/src/compat/compat.rs @@ -0,0 +1,20 @@ +/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future` +/// and vice versa. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Compat { + crate future: Fut, + crate executor: Option, +} + +impl Compat { + /// Returns the inner future. + pub fn into_inner(self) -> Fut { + self.future + } + + /// Creates a new `Compat`. + crate fn new(future: Fut, executor: Option) -> Compat { + Compat { future, executor } + } +} diff --git a/futures-util/src/compat/compat01to03.rs b/futures-util/src/compat/compat01to03.rs new file mode 100644 index 0000000000..626824c753 --- /dev/null +++ b/futures-util/src/compat/compat01to03.rs @@ -0,0 +1,61 @@ +use super::Compat; +use futures::Async as Async01; +use futures::Future as Future01; +use futures::executor::{self as executor01, NotifyHandle as NotifyHandle01, + Notify as Notify01, UnsafeNotify as UnsafeNotify01}; +use futures_core::Future as Future03; +use futures_core::task as task03; +use std::mem::PinMut; + +impl Future03 for Compat { + type Output = Result; + + fn poll( + self: PinMut, + cx: &mut task03::Context + ) -> task03::Poll { + let notify = &WakerToHandle(cx.waker()); + + executor01::with_notify(notify, 0, move || { + unsafe { + match PinMut::get_mut_unchecked(self).future.poll() { + Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)), + Ok(Async01::NotReady) => task03::Poll::Pending, + Err(e) => task03::Poll::Ready(Err(e)), + } + } + }) + } +} + +struct NotifyWaker(task03::Waker); + +#[derive(Clone)] +struct WakerToHandle<'a>(&'a task03::Waker); + +impl<'a> From> for NotifyHandle01 { + fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 { + let ptr = Box::new(NotifyWaker(handle.0.clone())); + + unsafe { + NotifyHandle01::new(Box::into_raw(ptr)) + } + } +} + +impl Notify01 for NotifyWaker { + fn notify(&self, _: usize) { + self.0.wake(); + } +} + +unsafe impl UnsafeNotify01 for NotifyWaker { + unsafe fn clone_raw(&self) -> NotifyHandle01 { + WakerToHandle(&self.0).into() + } + + unsafe fn drop_raw(&self) { + let ptr: *const dyn UnsafeNotify01 = self; + drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01)); + } +} diff --git a/futures-util/src/compat/compat03to01.rs b/futures-util/src/compat/compat03to01.rs new file mode 100644 index 0000000000..da8c261266 --- /dev/null +++ b/futures-util/src/compat/compat03to01.rs @@ -0,0 +1,41 @@ +use super::Compat; +use futures::Future as Future01; +use futures::Poll as Poll01; +use futures::task as task01; +use futures::Async as Async01; +use futures_core::TryFuture as TryFuture03; +use futures_core::task as task03; +use std::marker::Unpin; +use std::mem::PinMut; +use std::sync::Arc; + +impl Future01 for Compat +where Fut: TryFuture03 + Unpin, + Ex: task03::Executor +{ + type Item = Fut::Ok; + type Error = Fut::Error; + + fn poll(&mut self) -> Poll01 { + let waker = current_as_waker(); + let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap()); + match PinMut::new(&mut self.future).try_poll(&mut cx) { + task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)), + task03::Poll::Pending => Ok(Async01::NotReady), + task03::Poll::Ready(Err(e)) => Err(e), + } + } +} + +fn current_as_waker() -> task03::LocalWaker { + let arc_waker = Arc::new(Current(task01::current())); + task03::local_waker_from_nonlocal(arc_waker) +} + +struct Current(task01::Task); + +impl task03::Wake for Current { + fn wake(arc_self: &Arc) { + arc_self.0.notify(); + } +} diff --git a/futures-util/src/compat/executor.rs b/futures-util/src/compat/executor.rs index 24dfae4aaa..0933ffade8 100644 --- a/futures-util/src/compat/executor.rs +++ b/futures-util/src/compat/executor.rs @@ -1,70 +1,71 @@ +use super::Compat; +use crate::{TryFutureExt, FutureExt, future::NeverError}; use futures::future::Executor as Executor01; - use futures_core::task::Executor as Executor03; use futures_core::task as task03; use futures_core::future::FutureObj; -use super::Compat; -use crate::{TryFutureExt, FutureExt, future::NeverError}; - -pub struct BoxedExecutor(Box); +pub struct BoxedExecutor03(Box); -impl Executor03 for BoxedExecutor { - fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), task03::SpawnObjError> { +impl Executor03 for BoxedExecutor03 { + fn spawn_obj( + &mut self, + future: FutureObj<'static, ()>, + ) -> Result<(), task03::SpawnObjError> { (&mut *self.0).spawn_obj(future) } } /// A future that can run on a futures 0.1 executor. -pub type ExecutorFuture01 = Compat>, BoxedExecutor>; +pub type Executor01Future = Compat>, BoxedExecutor03>; /// Extension trait for futures 0.1 Executors. -pub trait Executor01CompatExt: Executor01 - + Clone + Send + 'static +pub trait Executor01CompatExt: Executor01 + + Clone + Send + 'static { /// Creates an `Executor` compatable with futures 0.3. - fn compat(self) -> CompatExecutor + fn compat(self) -> Executor01As03 where Self: Sized; } -impl Executor01CompatExt for E -where E: Executor01, - E: Clone + Send + 'static +impl Executor01CompatExt for Ex +where Ex: Executor01 + Clone + Send + 'static { - fn compat(self) -> CompatExecutor { - CompatExecutor { - exec: self, + fn compat(self) -> Executor01As03 { + Executor01As03 { + executor01: self, } } } /// Converts a futures 0.1 `Executor` into a futures 0.3 `Executor`. #[derive(Clone)] -pub struct CompatExecutor { - exec: E +pub struct Executor01As03 { + executor01: Ex } -impl Executor03 for CompatExecutor - where E: Executor01, - E: Clone + Send + 'static, +impl Executor03 for Executor01As03 +where Ex: Executor01, + Ex: Clone + Send + 'static, { fn spawn_obj( - &mut self, + &mut self, future: FutureObj<'static, ()>, ) -> Result<(), task03::SpawnObjError> { - - let fut = future.never_error().compat(BoxedExecutor(Box::new(self.clone()))); + let future = future.never_error().compat(BoxedExecutor03(Box::new(self.clone()))); - self.exec.execute(fut) - .map_err(|exec_err| { + match self.executor01.execute(future) { + Ok(()) => Ok(()), + Err(err) => { use futures_core::task::{SpawnObjError, SpawnErrorKind}; - - let fut = exec_err.into_future().into_inner().unwrap_or_else(|_| ()); - SpawnObjError { + + let fut = err.into_future().into_inner().unwrap_or_else(|_| ()); + Err(SpawnObjError { kind: SpawnErrorKind::shutdown(), - task: Box::new(fut).into(), - } - }) + future: Box::new(fut).into(), + }) + } + } } } diff --git a/futures-util/src/compat/future01ext.rs b/futures-util/src/compat/future01ext.rs new file mode 100644 index 0000000000..01fb4f114b --- /dev/null +++ b/futures-util/src/compat/future01ext.rs @@ -0,0 +1,18 @@ +use super::Compat; +use futures::Future as Future01; + +impl Future01CompatExt for Fut {} + +/// Extension trait for futures 0.1 Futures. +pub trait Future01CompatExt: Future01 { + /// Converts a futures 0.1 `Future` into a + /// futures 0.3 `Future>`. + fn compat(self) -> Compat where Self: Sized { + Compat { + future: self, + executor: None, + } + } +} + + diff --git a/futures-util/src/compat/mod.rs b/futures-util/src/compat/mod.rs index 598fe4503f..69edd02614 100644 --- a/futures-util/src/compat/mod.rs +++ b/futures-util/src/compat/mod.rs @@ -1,149 +1,15 @@ //! Futures 0.1 / 0.3 shims -//! #![allow(missing_debug_implementations)] -use futures::Future as Future01; -use futures::Poll as Poll01; -use futures::task as task01; -use futures::task::Task as Task01; -use futures::executor::{with_notify, NotifyHandle, Notify, UnsafeNotify}; - -use futures_core::Future as Future03; -use futures_core::TryFuture as TryFuture03; -use futures_core::Poll as Poll03; -use futures_core::task as task03; -use futures_core::task::Executor as Executor03; -use futures_core::task::{Wake, Waker, LocalWaker, local_waker_from_nonlocal}; - -use std::mem::PinMut; -use std::marker::Unpin; -use std::sync::Arc; - mod executor; -pub use self::executor::{Executor01CompatExt, ExecutorFuture01, CompatExecutor}; - -/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future` -/// and vice versa. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct Compat { - crate inner: Fut, - crate executor: Option, -} - -impl Compat { - /// Returns the inner future. - pub fn into_inner(self) -> Fut { - self.inner - } - - /// Creates a new `Compat`. - crate fn new(inner: Fut, executor: Option) -> Compat { - Compat { - inner, - executor - } - } -} - -impl Future03 for Compat where T: Future01 { - type Output = Result; - - fn poll(self: PinMut, cx: &mut task03::Context) -> Poll03 { - use futures::Async; - - let notify = &WakerToHandle(cx.waker()); - - with_notify(notify, 0, move || { - unsafe { - match PinMut::get_mut_unchecked(self).inner.poll() { - Ok(Async::Ready(t)) => Poll03::Ready(Ok(t)), - Ok(Async::NotReady) => Poll03::Pending, - Err(e) => Poll03::Ready(Err(e)), - } - } - }) - } -} - -struct NotifyWaker(Waker); - -#[derive(Clone)] -struct WakerToHandle<'a>(&'a Waker); - -impl<'a> From> for NotifyHandle { - fn from(handle: WakerToHandle<'a>) -> NotifyHandle { - let ptr = Box::new(NotifyWaker(handle.0.clone())); - - unsafe { - NotifyHandle::new(Box::into_raw(ptr)) - } - } -} - -impl Notify for NotifyWaker { - fn notify(&self, _: usize) { - self.0.wake(); - } -} - -unsafe impl UnsafeNotify for NotifyWaker { - unsafe fn clone_raw(&self) -> NotifyHandle { - WakerToHandle(&self.0).into() - } - - unsafe fn drop_raw(&self) { - let ptr: *const dyn UnsafeNotify = self; - drop(Box::from_raw(ptr as *mut dyn UnsafeNotify)); - } -} - - - - -impl Future01 for Compat where T: TryFuture03 + Unpin, - E: Executor03 -{ - type Item = T::Ok; - type Error = T::Error; - - fn poll(&mut self) -> Poll01 { - use futures::Async; - - let waker = current_as_waker(); - let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap()); - match PinMut::new(&mut self.inner).try_poll(&mut cx) { - Poll03::Ready(Ok(t)) => Ok(Async::Ready(t)), - Poll03::Pending => Ok(Async::NotReady), - Poll03::Ready(Err(e)) => Err(e), - } - } -} - -fn current_as_waker() -> LocalWaker { - let arc_waker = Arc::new(Current(task01::current())); - local_waker_from_nonlocal(arc_waker) -} - -struct Current(Task01); +pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; -impl Wake for Current { - fn wake(arc_self: &Arc) { - arc_self.0.notify(); - } -} +mod compat; +pub use self::compat::Compat; -/// Extension trait for futures 0.1 Futures. -pub trait Future01Ext: Future01 { - /// Converts a futures 0.1 `Future` into a - /// futures 0.3 `Future>`. - fn compat(self) -> Compat where Self: Sized { - Compat { - inner: self, - executor: None, - } - } -} +mod compat01to03; +mod compat03to01; -impl Future01Ext for T {} \ No newline at end of file +mod future01ext; +pub use self::future01ext::Future01CompatExt; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 4a64053dd9..291426a397 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -85,11 +85,12 @@ pub mod compat { //! pub use futures_util::compat::{ - Compat, CompatExecutor, - ExecutorFuture01, + Compat, + Executor01Future, + Executor01As03, Executor01CompatExt, - Future01Ext, - }; + Future01CompatExt, + }; } #[cfg(feature = "std")]