-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add StreamExt::timeout_repeating #5577
Conversation
Include a test for timeout behavior of `timeout()`, demonstrating and verifying that it will not repeat timeouts.
Extend `Timeout` with a new `TimeoutMode` that controls the behavior of how timeouts are produced. A new extension method `timeout_repeating` makes use of a newly added behavior where the stream continuously produces timeouts at the specified interval rather than stalling after a single timeout.
Use a strategy for testing the repeating timeout behavior that is less sensitive to timing to avoid spurious failures.
Use `start_paused = true` so the interval and timeouts in the stream timeout doc tests happen in a consistent order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR fell between the cracks. Sorry about the delay in review.
tokio-stream/src/stream_ext.rs
Outdated
fn timeout_repeating(self, duration: Duration) -> Timeout<Self> | ||
where | ||
Self: Sized, | ||
{ | ||
Timeout::new(self, duration, TimeoutMode::Repeating) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make two separate structs instead of adding a mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for having a look! I removed the mode and timeout.rs is now mostly unchanged. I took the liberty of reusing the Elapsed
struct for timeouts.
Revert adding conditional behavior to the `Timeout` struct and implement the new behavior in a new struct, while reusing the `Elapsed` struct to represent timeouts.
ready!(me.deadline.as_mut().poll(cx)); | ||
let next = Instant::now() + *me.duration; | ||
me.deadline.reset(next); | ||
Poll::Ready(Some(Err(Elapsed::new()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not yield the errors at a regular rate. The duration between two errors will be increased by the time between the timeout triggering and poll_next
being called. Perhaps we should use an Interval
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to use an Interval
. The infrequent polling problem has another edge case: if set to duration x
, and the underlying stream doesn't produce a value until x + t1
, but the stream is not polled until x + t2
, then when polled would it be expected to produce a timeout or the value from the underlying stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably makes sense to add a MissedTickBehavior
argument.
The infrequent polling problem has another edge case: if set to duration
x
, and the underlying stream doesn't produce a value untilx + t1
, but the stream is not polled untilx + t2
, then when polled would it be expected to produce a timeout or the value from the underlying stream?
I'm not sure. What do you think? Maybe it should depend on the MissedTickBehavior
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my use case, it isn't sensitive to these kinds of timing or ordering issues, but of course I would like this to be as generally useful as possible.
Returning a timeout when there is an actual value waiting to be returned feels a little pedantic but would arguably be the most correct behavior if not the most useful. I lean towards favoring the actual value, or making this a configurable behavior.
Avoid stretching the time between timeouts if the stream is not polled regularly.
To give the caller control over the MissedTickBehavior of the Interval used internally, take an Interval argument instead of a Duration and MissedTickBehavior argument. This simplifies usage for users that do not care about which MissedTickBehavior is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will need to merge in master to fix the CI errors.
tokio-stream/src/stream_ext.rs
Outdated
/// // Once a timeout error is received, no further events will be received | ||
/// // unless the wrapped stream yields a value (timeouts do not repeat). | ||
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100))); | ||
/// let timeout_stream = interval_stream.timeout(Duration::from_millis(10)); | ||
/// tokio::pin!(timeout_stream); | ||
/// | ||
/// // Only one timeout will be received between values in the source stream. | ||
/// assert!(timeout_stream.try_next().await.is_ok()); | ||
/// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); | ||
/// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this a separate example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put the new code into a new section, and now the original code in timeout
is unmodified. I think this is what you were going for, let me know if I misunderstood. Thanks!
Split out the repeating vs. non-repeating example as a separate code unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks good to me.
Motivation
The behavior of
StreamExt::timeout
is it only fires the timeout once (per value produced from the wrapped stream) and while this behavior is documented it is easy to overlook and assume that timeouts are generated repeatedly. I had made use oftimeout
in a project assuming that timeouts did repeat and it led to a subtle bug. It is useful for my use case for timeouts to fire repeatedly, so this change adds that intimeout_repeating
as an alternative totimeout
. For example, one might want to log a message at a fixed interval while no messages are being produced from a stream.Solution
The non-repeating timeout behavior of
timeout
was not tested or demonstrated by the doc test, so first this was added.A new
enum TimeoutMode
has been added to describe the two possible behaviors.stream_ext::Timeout
was modified with a newtimeout_mode: TimeoutMode
field to share an implementation between the two behaviors.