Skip to content

Commit

Permalink
Refactor compatibility layer
Browse files Browse the repository at this point in the history
  • Loading branch information
MajorBreakfast committed Aug 2, 2018
1 parent 3380430 commit e711c1a
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 178 deletions.
20 changes: 20 additions & 0 deletions futures-util/src/compat/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future`
/// and vice versa.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Compat<Fut, Ex> {
crate future: Fut,
crate executor: Option<Ex>,
}

impl<Fut, Ex> Compat<Fut, Ex> {
/// Returns the inner future.
pub fn into_inner(self) -> Fut {
self.future
}

/// Creates a new `Compat`.
crate fn new(future: Fut, executor: Option<Ex>) -> Compat<Fut, Ex> {
Compat { future, executor }
}
}
61 changes: 61 additions & 0 deletions futures-util/src/compat/compat01to03.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use super::Compat;
use futures::Async as Async01;
use futures::Future as Future01;
use futures::executor::{self as executor01, NotifyHandle as NotifyHandle01,
Notify as Notify01, UnsafeNotify as UnsafeNotify01};
use futures_core::Future as Future03;
use futures_core::task as task03;
use std::mem::PinMut;

impl<Fut: Future01> Future03 for Compat<Fut, ()> {
type Output = Result<Fut::Item, Fut::Error>;

fn poll(
self: PinMut<Self>,
cx: &mut task03::Context
) -> task03::Poll<Self::Output> {
let notify = &WakerToHandle(cx.waker());

executor01::with_notify(notify, 0, move || {
unsafe {
match PinMut::get_mut_unchecked(self).future.poll() {
Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)),
Ok(Async01::NotReady) => task03::Poll::Pending,
Err(e) => task03::Poll::Ready(Err(e)),
}
}
})
}
}

struct NotifyWaker(task03::Waker);

#[derive(Clone)]
struct WakerToHandle<'a>(&'a task03::Waker);

impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
let ptr = Box::new(NotifyWaker(handle.0.clone()));

unsafe {
NotifyHandle01::new(Box::into_raw(ptr))
}
}
}

impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}

unsafe impl UnsafeNotify01 for NotifyWaker {
unsafe fn clone_raw(&self) -> NotifyHandle01 {
WakerToHandle(&self.0).into()
}

unsafe fn drop_raw(&self) {
let ptr: *const dyn UnsafeNotify01 = self;
drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
}
}
41 changes: 41 additions & 0 deletions futures-util/src/compat/compat03to01.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use super::Compat;
use futures::Future as Future01;
use futures::Poll as Poll01;
use futures::task as task01;
use futures::Async as Async01;
use futures_core::TryFuture as TryFuture03;
use futures_core::task as task03;
use std::marker::Unpin;
use std::mem::PinMut;
use std::sync::Arc;

impl<Fut, Ex> Future01 for Compat<Fut, Ex>
where Fut: TryFuture03 + Unpin,
Ex: task03::Executor
{
type Item = Fut::Ok;
type Error = Fut::Error;

fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
let waker = current_as_waker();
let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap());
match PinMut::new(&mut self.future).try_poll(&mut cx) {
task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)),
task03::Poll::Pending => Ok(Async01::NotReady),
task03::Poll::Ready(Err(e)) => Err(e),
}
}
}

fn current_as_waker() -> task03::LocalWaker {
let arc_waker = Arc::new(Current(task01::current()));
task03::local_waker_from_nonlocal(arc_waker)
}

struct Current(task01::Task);

impl task03::Wake for Current {
fn wake(arc_self: &Arc<Self>) {
arc_self.0.notify();
}
}
67 changes: 34 additions & 33 deletions futures-util/src/compat/executor.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,71 @@

use super::Compat;
use crate::{TryFutureExt, FutureExt, future::NeverError};
use futures::future::Executor as Executor01;

use futures_core::task::Executor as Executor03;
use futures_core::task as task03;
use futures_core::future::FutureObj;

use super::Compat;
use crate::{TryFutureExt, FutureExt, future::NeverError};

pub struct BoxedExecutor(Box<dyn Executor03 + Send>);
pub struct BoxedExecutor03(Box<dyn Executor03 + Send>);

impl Executor03 for BoxedExecutor {
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), task03::SpawnObjError> {
impl Executor03 for BoxedExecutor03 {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), task03::SpawnObjError> {
(&mut *self.0).spawn_obj(future)
}
}

/// A future that can run on a futures 0.1 executor.
pub type ExecutorFuture01 = Compat<NeverError<FutureObj<'static, ()>>, BoxedExecutor>;
pub type Executor01Future = Compat<NeverError<FutureObj<'static, ()>>, BoxedExecutor03>;

/// Extension trait for futures 0.1 Executors.
pub trait Executor01CompatExt: Executor01<ExecutorFuture01>
+ Clone + Send + 'static
pub trait Executor01CompatExt: Executor01<Executor01Future> +
Clone + Send + 'static
{
/// Creates an `Executor` compatable with futures 0.3.
fn compat(self) -> CompatExecutor<Self>
fn compat(self) -> Executor01As03<Self>
where Self: Sized;
}

impl<E> Executor01CompatExt for E
where E: Executor01<ExecutorFuture01>,
E: Clone + Send + 'static
impl<Ex> Executor01CompatExt for Ex
where Ex: Executor01<Executor01Future> + Clone + Send + 'static
{
fn compat(self) -> CompatExecutor<Self> {
CompatExecutor {
exec: self,
fn compat(self) -> Executor01As03<Self> {
Executor01As03 {
executor01: self,
}
}
}

/// Converts a futures 0.1 `Executor` into a futures 0.3 `Executor`.
#[derive(Clone)]
pub struct CompatExecutor<E> {
exec: E
pub struct Executor01As03<Ex> {
executor01: Ex
}

impl<E> Executor03 for CompatExecutor<E>
where E: Executor01<ExecutorFuture01>,
E: Clone + Send + 'static,
impl<Ex> Executor03 for Executor01As03<Ex>
where Ex: Executor01<Executor01Future>,
Ex: Clone + Send + 'static,
{
fn spawn_obj(
&mut self,
&mut self,
future: FutureObj<'static, ()>,
) -> Result<(), task03::SpawnObjError> {

let fut = future.never_error().compat(BoxedExecutor(Box::new(self.clone())));
let future = future.never_error().compat(BoxedExecutor03(Box::new(self.clone())));

self.exec.execute(fut)
.map_err(|exec_err| {
match self.executor01.execute(future) {
Ok(()) => Ok(()),
Err(err) => {
use futures_core::task::{SpawnObjError, SpawnErrorKind};
let fut = exec_err.into_future().into_inner().unwrap_or_else(|_| ());
SpawnObjError {

let fut = err.into_future().into_inner().unwrap_or_else(|_| ());
Err(SpawnObjError {
kind: SpawnErrorKind::shutdown(),
task: Box::new(fut).into(),
}
})
future: Box::new(fut).into(),
})
}
}
}
}
18 changes: 18 additions & 0 deletions futures-util/src/compat/future01ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use super::Compat;
use futures::Future as Future01;

impl<Fut: Future01> Future01CompatExt for Fut {}

/// Extension trait for futures 0.1 Futures.
pub trait Future01CompatExt: Future01 {
/// Converts a futures 0.1 `Future<Item = T, Error = E>` into a
/// futures 0.3 `Future<Output = Result<T, E>>`.
fn compat(self) -> Compat<Self, ()> where Self: Sized {
Compat {
future: self,
executor: None,
}
}
}


Loading

0 comments on commit e711c1a

Please sign in to comment.