From 93a804954d3302f2f2acd098847ddb23b7923a32 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 8 Sep 2021 16:34:23 +0200 Subject: [PATCH] futures-util: add StreamExt::count method Signed-off-by: Petros Angelatos --- futures-util/src/stream/stream/count.rs | 53 +++++++++++++++++++++++++ futures-util/src/stream/stream/mod.rs | 36 +++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 futures-util/src/stream/stream/count.rs diff --git a/futures-util/src/stream/stream/count.rs b/futures-util/src/stream/stream/count.rs new file mode 100644 index 0000000000..513cab7b6a --- /dev/null +++ b/futures-util/src/stream/stream/count.rs @@ -0,0 +1,53 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`count`](super::StreamExt::count) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Count { + #[pin] + stream: St, + count: usize + } +} + +impl fmt::Debug for Count +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish() + } +} + +impl Count { + pub(super) fn new(stream: St) -> Self { + Self { stream, count: 0 } + } +} + +impl FusedFuture for Count { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +impl Future for Count { + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + Poll::Ready(loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(_) => *this.count += 1, + None => break *this.count, + } + }) + } +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index e10354312e..8209bc5d04 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -38,6 +38,10 @@ mod concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::concat::Concat; +mod count; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::count::Count; + mod cycle; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::cycle::Cycle; @@ -590,6 +594,38 @@ pub trait StreamExt: Stream { assert_future::(Concat::new(self)) } + /// Drives the stream to completion, counting the number of items. + /// + /// # Overflow Behavior + /// + /// The method does no guarding against overflows, so counting elements of a + /// stream with more than [`usize::MAX`] elements either produces the wrong + /// result or panics. If debug assertions are enabled, a panic is guaranteed. + /// + /// # Panics + /// + /// This function might panic if the iterator has more than [`usize::MAX`] + /// elements. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let count = stream.count().await; + /// + /// assert_eq!(count, 10); + /// # }); + /// ``` + fn count(self) -> Count + where + Self: Sized, + { + assert_future::(Count::new(self)) + } + /// Repeats a stream endlessly. /// /// The stream never terminates. Note that you likely want to avoid