-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add StreamExt::timeout_repeating #5577
Changes from 4 commits
07518c8
7c97156
63c445b
e98018c
71e1c8b
9e644ae
be9c683
4d45ee1
cdd0b47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,7 @@ use try_next::TryNext; | |
|
||
cfg_time! { | ||
pub(crate) mod timeout; | ||
use timeout::Timeout; | ||
use timeout::{Timeout, TimeoutMode}; | ||
use tokio::time::Duration; | ||
mod throttle; | ||
use throttle::{throttle, Throttle}; | ||
|
@@ -924,7 +924,9 @@ pub trait StreamExt: Stream { | |
/// If the wrapped stream yields a value before the deadline is reached, the | ||
/// value is returned. Otherwise, an error is returned. The caller may decide | ||
/// to continue consuming the stream and will eventually get the next source | ||
/// stream value once it becomes available. | ||
/// stream value once it becomes available. See | ||
/// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative | ||
/// where the timeouts will repeat. | ||
/// | ||
/// # Notes | ||
/// | ||
|
@@ -939,9 +941,9 @@ pub trait StreamExt: Stream { | |
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): | ||
/// | ||
/// ``` | ||
/// # #[tokio::main] | ||
/// # #[tokio::main(flavor = "current_thread", start_paused = true)] | ||
/// # async fn main() { | ||
/// use tokio_stream::{self as stream, StreamExt}; | ||
/// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream}; | ||
/// use std::time::Duration; | ||
/// # let int_stream = stream::iter(1..=3); | ||
/// | ||
|
@@ -969,6 +971,17 @@ pub trait StreamExt: Stream { | |
/// | ||
/// assert_eq!(int_stream.try_next().await, Ok(Some(1))); | ||
/// assert_eq!(int_stream.try_next().await, Ok(None)); | ||
/// | ||
/// // Once a timeout error is received, no further events will be received | ||
/// // unless the wrapped stream yields a value (timeouts do not repeat). | ||
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100))); | ||
/// let timeout_stream = interval_stream.timeout(Duration::from_millis(10)); | ||
/// tokio::pin!(timeout_stream); | ||
/// | ||
/// // Only one timeout will be received between values in the source stream. | ||
/// assert!(timeout_stream.try_next().await.is_ok()); | ||
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); | ||
/// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts"); | ||
/// # } | ||
/// ``` | ||
#[cfg(all(feature = "time"))] | ||
|
@@ -977,7 +990,87 @@ pub trait StreamExt: Stream { | |
where | ||
Self: Sized, | ||
{ | ||
Timeout::new(self, duration) | ||
Timeout::new(self, duration, TimeoutMode::Once) | ||
} | ||
|
||
/// Applies a per-item timeout to the passed stream. | ||
/// | ||
/// `timeout_repeating()` takes a `Duration` that represents the maximum amount of | ||
/// time each element of the stream has to complete before timing out. | ||
/// | ||
/// If the wrapped stream yields a value before the deadline is reached, the | ||
/// value is returned. Otherwise, an error is returned. The caller may decide | ||
/// to continue consuming the stream and will eventually get the next source | ||
/// stream value once it becomes available. Unlike `timeout()`, if no value | ||
/// becomes available before the deadline is reached, additional errors are | ||
/// returned at the specified interval. See [`timeout`](StreamExt::timeout) | ||
/// for an alternative where the timeouts do not repeat. | ||
/// | ||
/// # Notes | ||
/// | ||
/// This function consumes the stream passed into it and returns a | ||
/// wrapped version of it. | ||
/// | ||
/// Polling the returned stream will continue to poll the inner stream even | ||
/// if one or more items time out. | ||
/// | ||
/// # Examples | ||
/// | ||
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): | ||
/// | ||
/// ``` | ||
/// # #[tokio::main(flavor = "current_thread", start_paused = true)] | ||
/// # async fn main() { | ||
/// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream}; | ||
/// use std::time::Duration; | ||
/// # let int_stream = stream::iter(1..=3); | ||
/// | ||
/// let int_stream = int_stream.timeout_repeating(Duration::from_secs(1)); | ||
/// tokio::pin!(int_stream); | ||
/// | ||
/// // When no items time out, we get the 3 elements in succession: | ||
/// assert_eq!(int_stream.try_next().await, Ok(Some(1))); | ||
/// assert_eq!(int_stream.try_next().await, Ok(Some(2))); | ||
/// assert_eq!(int_stream.try_next().await, Ok(Some(3))); | ||
/// assert_eq!(int_stream.try_next().await, Ok(None)); | ||
/// | ||
/// // If the second item times out, we get an error and continue polling the stream: | ||
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); | ||
/// assert_eq!(int_stream.try_next().await, Ok(Some(1))); | ||
/// assert!(int_stream.try_next().await.is_err()); | ||
/// assert_eq!(int_stream.try_next().await, Ok(Some(2))); | ||
/// assert_eq!(int_stream.try_next().await, Ok(Some(3))); | ||
/// assert_eq!(int_stream.try_next().await, Ok(None)); | ||
/// | ||
/// // If we want to stop consuming the source stream the first time an | ||
/// // element times out, we can use the `take_while` operator: | ||
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); | ||
/// let mut int_stream = int_stream.take_while(Result::is_ok); | ||
/// | ||
/// assert_eq!(int_stream.try_next().await, Ok(Some(1))); | ||
/// assert_eq!(int_stream.try_next().await, Ok(None)); | ||
/// | ||
/// // Timeout errors will be continuously produced at the specified | ||
/// // interval until the wrapped stream yields a value. | ||
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); | ||
/// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(9)); | ||
/// tokio::pin!(timeout_stream); | ||
/// | ||
/// // Multiple timeouts will be received between values in the source stream. | ||
/// assert!(timeout_stream.try_next().await.is_ok()); | ||
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); | ||
/// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout"); | ||
/// // Will eventually receive another value from the source stream... | ||
/// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout"); | ||
/// # } | ||
/// ``` | ||
#[cfg(all(feature = "time"))] | ||
#[cfg_attr(docsrs, doc(cfg(feature = "time")))] | ||
fn timeout_repeating(self, duration: Duration) -> Timeout<Self> | ||
where | ||
Self: Sized, | ||
{ | ||
Timeout::new(self, duration, TimeoutMode::Repeating) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please make two separate structs instead of adding a mode. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for having a look! I removed the mode and timeout.rs is now mostly unchanged. I took the liberty of reusing the |
||
|
||
/// Slows down a stream by enforcing a delay between items. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this a separate example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put the new code into a new section, and now the original code in
timeout
is unmodified. I think this is what you were going for, let me know if I misunderstood. Thanks!