From e80e3d055b7ea01c0eb3bb13aabcf3d71eb644ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Barv=C3=AD=C5=99?= Date: Sat, 26 Jun 2021 19:25:50 +0200 Subject: [PATCH 1/4] StreamExt (all, any) --- futures-util/src/stream/stream/all.rs | 90 +++++++++++++++++++++++++++ futures-util/src/stream/stream/any.rs | 90 +++++++++++++++++++++++++++ futures-util/src/stream/stream/mod.rs | 52 ++++++++++++++++ 3 files changed, 232 insertions(+) create mode 100644 futures-util/src/stream/stream/all.rs create mode 100644 futures-util/src/stream/stream/any.rs diff --git a/futures-util/src/stream/stream/all.rs b/futures-util/src/stream/stream/all.rs new file mode 100644 index 0000000000..bbd8c6464b --- /dev/null +++ b/futures-util/src/stream/stream/all.rs @@ -0,0 +1,90 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`fold`](super::StreamExt::fold) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct All { + #[pin] + stream: St, + f: F, + accum: Option, + #[pin] + future: Option, + } +} + +impl fmt::Debug for All +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("All") + .field("stream", &self.stream) + .field("accum", &self.accum) + .field("future", &self.future) + .finish() + } +} + +impl All +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, accum: Some(true), future: None } + } +} + +impl FusedFuture for All +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.accum.is_none() && self.future.is_none() + } +} + +impl Future for All +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let acc = this.accum.unwrap() && ready!(fut.poll(cx)); + if acc == false { break false } // early exit + *this.accum = Some(acc); + this.future.set(None); + } else if this.accum.is_some() { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + this.future.set(Some((this.f)(item))); + } + None => { // we finish + break this.accum.take().unwrap(); + } + } + } else { + panic!("All polled after completion") + } + }) + } +} \ No newline at end of file diff --git a/futures-util/src/stream/stream/any.rs b/futures-util/src/stream/stream/any.rs new file mode 100644 index 0000000000..2d3347ef2a --- /dev/null +++ b/futures-util/src/stream/stream/any.rs @@ -0,0 +1,90 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`fold`](super::StreamExt::fold) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Any { + #[pin] + stream: St, + f: F, + accum: Option, + #[pin] + future: Option, + } +} + +impl fmt::Debug for Any +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Any") + .field("stream", &self.stream) + .field("accum", &self.accum) + .field("future", &self.future) + .finish() + } +} + +impl Any +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, accum: Some(false), future: None } + } +} + +impl FusedFuture for Any +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.accum.is_none() && self.future.is_none() + } +} + +impl Future for Any +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let acc = this.accum.unwrap() && ready!(fut.poll(cx)); + if acc == true { break true } // early exit + *this.accum = Some(acc); + this.future.set(None); + } else if this.accum.is_some() { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { // pulled + this.future.set(Some((this.f)(item))); + } + None => { // we finish + break this.accum.take().unwrap(); + } + } + } else { + panic!("Any polled after completion") + } + }) + } +} \ No newline at end of file diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index e5a3b3d979..2240e81902 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -70,6 +70,14 @@ mod fold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::fold::Fold; +mod any; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::any::Any; + +mod all; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::all::All; + #[cfg(feature = "sink")] mod forward; @@ -643,6 +651,50 @@ pub trait StreamExt: Stream { assert_future::(Fold::new(self, f, init)) } + /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter(0..10); + /// let contain_three = number_stream.any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, true); + /// # }); + /// ``` + fn any(self, f: F) -> Any + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + assert_future::(Any::new(self, f)) + } + + /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter(0..10); + /// let less_then_twenty = number_stream.all(|i| async move { i < 20 }); + /// assert_eq!(less_then_twenty.await, true); + /// # }); + /// ``` + fn all(self, f: F) -> All + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + assert_future::(All::new(self, f)) + } + /// Flattens a stream of streams into just one continuous stream. /// /// # Examples From 37ccc758f3b79e0f4f8537f126b36af54b645015 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Barv=C3=AD=C5=99?= Date: Sun, 27 Jun 2021 11:31:46 +0200 Subject: [PATCH 2/4] Any logic or operator --- futures-util/src/stream/stream/any.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/stream/stream/any.rs b/futures-util/src/stream/stream/any.rs index 2d3347ef2a..f961caef57 100644 --- a/futures-util/src/stream/stream/any.rs +++ b/futures-util/src/stream/stream/any.rs @@ -68,7 +68,7 @@ where Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() && ready!(fut.poll(cx)); + let acc = this.accum.unwrap() || ready!(fut.poll(cx)); if acc == true { break true } // early exit *this.accum = Some(acc); this.future.set(None); From 12f58f1293f5212fe1e9c5e9d837f25a570ca09d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Barv=C3=AD=C5=99?= Date: Sun, 27 Jun 2021 12:57:21 +0200 Subject: [PATCH 3/4] clippy and cargo fmt --- futures-util/src/stream/stream/all.rs | 10 ++++++---- futures-util/src/stream/stream/any.rs | 12 +++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/futures-util/src/stream/stream/all.rs b/futures-util/src/stream/stream/all.rs index bbd8c6464b..a50c308504 100644 --- a/futures-util/src/stream/stream/all.rs +++ b/futures-util/src/stream/stream/all.rs @@ -20,7 +20,7 @@ pin_project! { } impl fmt::Debug for All -where +where St: fmt::Debug, Fut: fmt::Debug, { @@ -69,7 +69,9 @@ where if let Some(fut) = this.future.as_mut().as_pin_mut() { // we're currently processing a future to produce a new accum value let acc = this.accum.unwrap() && ready!(fut.poll(cx)); - if acc == false { break false } // early exit + if !acc { + break false; + } // early exit *this.accum = Some(acc); this.future.set(None); } else if this.accum.is_some() { @@ -78,7 +80,7 @@ where Some(item) => { this.future.set(Some((this.f)(item))); } - None => { // we finish + None => { break this.accum.take().unwrap(); } } @@ -87,4 +89,4 @@ where } }) } -} \ No newline at end of file +} diff --git a/futures-util/src/stream/stream/any.rs b/futures-util/src/stream/stream/any.rs index f961caef57..280e026677 100644 --- a/futures-util/src/stream/stream/any.rs +++ b/futures-util/src/stream/stream/any.rs @@ -20,7 +20,7 @@ pin_project! { } impl fmt::Debug for Any -where +where St: fmt::Debug, Fut: fmt::Debug, { @@ -69,16 +69,18 @@ where if let Some(fut) = this.future.as_mut().as_pin_mut() { // we're currently processing a future to produce a new accum value let acc = this.accum.unwrap() || ready!(fut.poll(cx)); - if acc == true { break true } // early exit + if acc { + break true; + } // early exit *this.accum = Some(acc); this.future.set(None); } else if this.accum.is_some() { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { - Some(item) => { // pulled + Some(item) => { this.future.set(Some((this.f)(item))); } - None => { // we finish + None => { break this.accum.take().unwrap(); } } @@ -87,4 +89,4 @@ where } }) } -} \ No newline at end of file +} From 7915254b5e71f6730a1eab5a47ede7e4983ed9a0 Mon Sep 17 00:00:00 2001 From: Marek Barvir Date: Mon, 28 Jun 2021 10:19:18 +0200 Subject: [PATCH 4/4] fix comments --- futures-util/src/stream/stream/all.rs | 2 +- futures-util/src/stream/stream/any.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/futures-util/src/stream/stream/all.rs b/futures-util/src/stream/stream/all.rs index a50c308504..ba2baa5cf1 100644 --- a/futures-util/src/stream/stream/all.rs +++ b/futures-util/src/stream/stream/all.rs @@ -7,7 +7,7 @@ use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { - /// Future for the [`fold`](super::StreamExt::fold) method. + /// Future for the [`all`](super::StreamExt::all) method. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct All { #[pin] diff --git a/futures-util/src/stream/stream/any.rs b/futures-util/src/stream/stream/any.rs index 280e026677..f023125c70 100644 --- a/futures-util/src/stream/stream/any.rs +++ b/futures-util/src/stream/stream/any.rs @@ -7,7 +7,7 @@ use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { - /// Future for the [`fold`](super::StreamExt::fold) method. + /// Future for the [`any`](super::StreamExt::any) method. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Any { #[pin]