Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add StreamExt::timeout_repeating #5577

Merged
merged 9 commits into from
Apr 24, 2023
103 changes: 98 additions & 5 deletions tokio-stream/src/stream_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use try_next::TryNext;

cfg_time! {
pub(crate) mod timeout;
use timeout::Timeout;
use timeout::{Timeout, TimeoutMode};
use tokio::time::Duration;
mod throttle;
use throttle::{throttle, Throttle};
Expand Down Expand Up @@ -924,7 +924,9 @@ pub trait StreamExt: Stream {
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available.
/// stream value once it becomes available. See
/// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
/// where the timeouts will repeat.
///
/// # Notes
///
Expand All @@ -939,9 +941,9 @@ pub trait StreamExt: Stream {
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
///
/// ```
/// # #[tokio::main]
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
///
Expand Down Expand Up @@ -969,6 +971,17 @@ pub trait StreamExt: Stream {
///
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // Once a timeout error is received, no further events will be received
/// // unless the wrapped stream yields a value (timeouts do not repeat).
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
/// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
/// tokio::pin!(timeout_stream);
///
/// // Only one timeout will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
/// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this a separate example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put the new code into a new section, and now the original code in timeout is unmodified. I think this is what you were going for, let me know if I misunderstood. Thanks!

/// # }
/// ```
#[cfg(all(feature = "time"))]
Expand All @@ -977,7 +990,87 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
Timeout::new(self, duration)
Timeout::new(self, duration, TimeoutMode::Once)
}

/// Applies a per-item timeout to the passed stream.
///
/// `timeout_repeating()` takes a `Duration` that represents the maximum amount of
/// time each element of the stream has to complete before timing out.
///
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available. Unlike `timeout()`, if no value
/// becomes available before the deadline is reached, additional errors are
/// returned at the specified interval. See [`timeout`](StreamExt::timeout)
/// for an alternative where the timeouts do not repeat.
///
/// # Notes
///
/// This function consumes the stream passed into it and returns a
/// wrapped version of it.
///
/// Polling the returned stream will continue to poll the inner stream even
/// if one or more items time out.
///
/// # Examples
///
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
///
/// ```
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
///
/// let int_stream = int_stream.timeout_repeating(Duration::from_secs(1));
/// tokio::pin!(int_stream);
///
/// // When no items time out, we get the 3 elements in succession:
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If the second item times out, we get an error and continue polling the stream:
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert!(int_stream.try_next().await.is_err());
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If we want to stop consuming the source stream the first time an
/// // element times out, we can use the `take_while` operator:
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// let mut int_stream = int_stream.take_while(Result::is_ok);
///
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // Timeout errors will be continuously produced at the specified
/// // interval until the wrapped stream yields a value.
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
/// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(9));
/// tokio::pin!(timeout_stream);
///
/// // Multiple timeouts will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
/// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
/// // Will eventually receive another value from the source stream...
/// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
/// # }
/// ```
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout_repeating(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, duration, TimeoutMode::Repeating)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make two separate structs instead of adding a mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for having a look! I removed the mode and timeout.rs is now mostly unchanged. I took the liberty of reusing the Elapsed struct for timeouts.


/// Slows down a stream by enforcing a delay between items.
Expand Down
61 changes: 45 additions & 16 deletions tokio-stream/src/stream_ext/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ use pin_project_lite::pin_project;
use std::fmt;
use std::time::Duration;

/// Behavior of timeouts.
#[derive(Debug)]
pub(super) enum TimeoutMode {
/// A timeout will only fire once.
Once,
/// A timeout will fire repeatedly.
Repeating,
}

pin_project! {
/// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
#[must_use = "streams do nothing unless polled"]
Expand All @@ -20,6 +29,7 @@ pin_project! {
deadline: Sleep,
duration: Duration,
poll_deadline: bool,
timeout_mode: TimeoutMode,
}
}

Expand All @@ -28,7 +38,7 @@ pin_project! {
pub struct Elapsed(());

impl<S: Stream> Timeout<S> {
pub(super) fn new(stream: S, duration: Duration) -> Self {
pub(super) fn new(stream: S, duration: Duration, timeout_mode: TimeoutMode) -> Self {
let next = Instant::now() + duration;
let deadline = tokio::time::sleep_until(next);

Expand All @@ -37,6 +47,7 @@ impl<S: Stream> Timeout<S> {
deadline,
duration,
poll_deadline: true,
timeout_mode,
}
}
}
Expand All @@ -45,7 +56,7 @@ impl<S: Stream> Stream for Timeout<S> {
type Item = Result<S::Item, Elapsed>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.project();
let mut me = self.project();

match me.stream.poll_next(cx) {
Poll::Ready(v) => {
Expand All @@ -59,28 +70,46 @@ impl<S: Stream> Stream for Timeout<S> {
Poll::Pending => {}
};

if *me.poll_deadline {
ready!(me.deadline.poll(cx));
*me.poll_deadline = false;
return Poll::Ready(Some(Err(Elapsed::new())));
}
match me.timeout_mode {
TimeoutMode::Once => {
if *me.poll_deadline {
ready!(me.deadline.poll(cx));
*me.poll_deadline = false;
return Poll::Ready(Some(Err(Elapsed::new())));
}

Poll::Pending
Poll::Pending
}
TimeoutMode::Repeating => {
ready!(me.deadline.as_mut().poll(cx));
let next = Instant::now() + *me.duration;
me.deadline.reset(next);
Poll::Ready(Some(Err(Elapsed::new())))
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.stream.size_hint();

// The timeout stream may insert an error before and after each message
// from the underlying stream, but no more than one error between each
// message. Hence the upper bound is computed as 2x+1.
match self.timeout_mode {
TimeoutMode::Once => {
// The timeout stream may insert an error before and after each message
// from the underlying stream, but no more than one error between each
// message. Hence the upper bound is computed as 2x+1.

// Using a helper function to enable use of question mark operator.
fn twice_plus_one(value: Option<usize>) -> Option<usize> {
value?.checked_mul(2)?.checked_add(1)
}
// Using a helper function to enable use of question mark operator.
fn twice_plus_one(value: Option<usize>) -> Option<usize> {
value?.checked_mul(2)?.checked_add(1)
}

(lower, twice_plus_one(upper))
(lower, twice_plus_one(upper))
}
TimeoutMode::Repeating => {
// The timeout stream may insert an error an infinite number of times.
(lower, None)
}
}
}
}

Expand Down