Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace select!/try_select! with Future::{race,try_race} #405

Merged
11 commits merged into from
Nov 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/future/future.rs → src/future/future/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
cfg_unstable! {
mod delay;
mod race;
mod try_race;

use std::time::Duration;

use delay::DelayFuture;
use race::Race;
use try_race::TryRace;
}

extension_trait! {
Expand Down Expand Up @@ -129,6 +133,94 @@ extension_trait! {
{
DelayFuture::new(self, dur)
}

#[doc = r#"
Waits for one of two similarly-typed futures to complete.

Awaits multiple futures simultaneously, returning the output of the
first future that completes.

This function will return a new future which awaits for either one of both
futures to complete. If multiple futures are completed at the same time,
resolution will occur in the order that they have been passed.

Note that this macro consumes all futures passed, and once a future is
completed, all other futures are dropped.

This macro is only usable inside of async functions, closures, and blocks.

# Examples

```
# async_std::task::block_on(async {
use async_std::prelude::*;
use async_std::future;

let a = future::pending();
let b = future::ready(1u8);
let c = future::ready(2u8);

let f = a.race(b).race(c);
assert_eq!(f.await, 1u8);
# });
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn race<F>(
self,
other: F
) -> impl Future<Output = <Self as std::future::Future>::Output> [Race<Self, F>]
where
Self: std::future::Future + Sized,
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
{
Race::new(self, other)
}

#[doc = r#"
Waits for one of two similarly-typed fallible futures to complete.

Awaits multiple futures simultaneously, returning all results once complete.

`try_race` is similar to [`race`], but keeps going if a future
resolved to an error until all futures have been resolved. In which case
an error is returned.

The ordering of which value is yielded when two futures resolve
simultaneously is intentionally left unspecified.

# Examples

```
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::future;
use std::io::{Error, ErrorKind};

let a = future::pending::<Result<_, Error>>();
let b = future::ready(Err(Error::from(ErrorKind::Other)));
let c = future::ready(Ok(1u8));

let f = a.try_race(b).try_race(c);
assert_eq!(f.await?, 1u8);
#
# Ok(()) }) }
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn try_race<F: std::future::Future, T, E>(
self,
other: F
) -> impl Future<Output = <Self as std::future::Future>::Output> [TryRace<Self, F>]
where
Self: std::future::Future<Output = Result<T, E>> + Sized,
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
{
TryRace::new(self, other)
}
}

impl<F: Future + Unpin + ?Sized> Future for Box<F> {
Expand Down
57 changes: 57 additions & 0 deletions src/future/future/race.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::pin::Pin;

use async_macros::MaybeDone;
use pin_project_lite::pin_project;

use crate::task::{Context, Poll};
use std::future::Future;

pin_project! {
#[allow(missing_docs)]
#[allow(missing_debug_implementations)]
pub struct Race<L, R>
where
L: Future,
R: Future<Output = L::Output>
{
#[pin] left: MaybeDone<L>,
#[pin] right: MaybeDone<R>,
}
}

impl<L, R> Race<L, R>
where
L: Future,
R: Future<Output = L::Output>,
{
pub(crate) fn new(left: L, right: R) -> Self {
Self {
left: MaybeDone::new(left),
right: MaybeDone::new(right),
}
}
}

impl<L, R> Future for Race<L, R>
where
L: Future,
R: Future<Output = L::Output>,
{
type Output = L::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

let mut left = this.left;
if Future::poll(Pin::new(&mut left), cx).is_ready() {
return Poll::Ready(left.take().unwrap());
}

let mut right = this.right;
if Future::poll(Pin::new(&mut right), cx).is_ready() {
return Poll::Ready(right.take().unwrap());
}

Poll::Pending
}
}
66 changes: 66 additions & 0 deletions src/future/future/try_race.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::pin::Pin;

use async_macros::MaybeDone;
use pin_project_lite::pin_project;

use crate::task::{Context, Poll};
use std::future::Future;

pin_project! {
#[allow(missing_docs)]
#[allow(missing_debug_implementations)]
pub struct TryRace<L, R>
where
L: Future,
R: Future<Output = L::Output>
{
#[pin] left: MaybeDone<L>,
#[pin] right: MaybeDone<R>,
}
}

impl<L, R> TryRace<L, R>
where
L: Future,
R: Future<Output = L::Output>,
{
pub(crate) fn new(left: L, right: R) -> Self {
Self {
left: MaybeDone::new(left),
right: MaybeDone::new(right),
}
}
}

impl<L, R, T, E> Future for TryRace<L, R>
where
L: Future<Output = Result<T, E>>,
R: Future<Output = L::Output>,
{
type Output = L::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut left_errored = false;

// Check if the left future is ready & successful. Continue if not.
let mut left = this.left;
if Future::poll(Pin::new(&mut left), cx).is_ready() {
if left.as_ref().output().unwrap().is_ok() {
return Poll::Ready(left.take().unwrap());
} else {
left_errored = true;
}
}

// Check if the right future is ready & successful. Return err if left
// future also resolved to err. Continue if not.
let mut right = this.right;
let is_ready = Future::poll(Pin::new(&mut right), cx).is_ready();
if is_ready && (right.as_ref().output().unwrap().is_ok() || left_errored) {
return Poll::Ready(right.take().unwrap());
}

Poll::Pending
}
}
34 changes: 18 additions & 16 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
//!
//! Often it's desireable to await multiple futures as if it was a single
//! future. The `join` family of operations converts multiple futures into a
//! single future that returns all of their outputs. The `select` family of
//! single future that returns all of their outputs. The `race` family of
//! operations converts multiple future into a single future that returns the
//! first output.
//!
//! For operating on futures the following macros can be used:
//!
//! | Name | Return signature | When does it return? |
//! | --- | --- | --- |
//! | `future::join` | `(T1, T2)` | Wait for all to complete
//! | `future::select` | `T` | Return on first value
//! | Name | Return signature | When does it return? |
//! | --- | --- | --- |
//! | [`future::join!`] | `(T1, T2)` | Wait for all to complete
//! | [`Future::race`] | `T` | Return on first value
//!
//! ## Fallible Futures Concurrency
//!
Expand All @@ -25,21 +25,26 @@
//! futures are dropped and an error is returned. This is referred to as
//! "short-circuiting".
//!
//! In the case of `try_select`, instead of returning the first future that
//! In the case of `try_race`, instead of returning the first future that
//! completes it returns the first future that _successfully_ completes. This
//! means `try_select` will keep going until any one of the futures returns
//! means `try_race` will keep going until any one of the futures returns
//! `Ok`, or _all_ futures have returned `Err`.
//!
//! However sometimes it can be useful to use the base variants of the macros
//! even on futures that return `Result`. Here is an overview of operations that
//! work on `Result`, and their respective semantics:
//!
//! | Name | Return signature | When does it return? |
//! | --- | --- | --- |
//! | `future::join` | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
//! | `future::try_join` | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
//! | `future::select` | `Result<T, E>` | Return on first value
//! | `future::try_select` | `Result<T, E>` | Return on first `Ok`, reject on last Err
//! | Name | Return signature | When does it return? |
//! | --- | --- | --- |
//! | [`future::join!`] | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
//! | [`future::try_join!`] | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
//! | [`Future::race`] | `Result<T, E>` | Return on first value
//! | [`Future::try_race`] | `Result<T, E>` | Return on first `Ok`, reject on last Err
//!
//! [`future::join!`]: macro.join.html
//! [`future::try_join!`]: macro.try_join.html
//! [`Future::race`]: trait.Future.html#method.race
//! [`Future::try_race`]: trait.Future.html#method.try_race

#[doc(inline)]
pub use async_macros::{join, try_join};
Expand All @@ -57,9 +62,6 @@ mod ready;
mod timeout;

cfg_unstable! {
#[doc(inline)]
pub use async_macros::{select, try_select};

pub use into_future::IntoFuture;
mod into_future;
}
2 changes: 1 addition & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ macro_rules! extension_trait {
};

// Parse the return type in an extension method.
(@doc ($($head:tt)*) -> impl Future<Output = $out:ty> [$f:ty] $($tail:tt)*) => {
(@doc ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {
extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*);
};
(@ext ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {
Expand Down