-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Publisher.timeoutTerminal(Duration) operator #1445
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestions and a couple questions to discuss. Otherwise, LGTM
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TimeoutPublisher.java
Outdated
Show resolved
Hide resolved
...ncurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/TimeoutPublisherTest.java
Outdated
Show resolved
Hide resolved
...ncurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/TimeoutPublisherTest.java
Outdated
Show resolved
Hide resolved
...k-concurrent-internal/src/main/java/io/servicetalk/concurrent/internal/SignalOffloaders.java
Show resolved
Hide resolved
d2c2c7f
to
39d1d49
Compare
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TimeoutPublisher.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TimeoutPublisher.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TimeoutPublisher.java
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TimeoutPublisher.java
Outdated
Show resolved
Hide resolved
...ncurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/TimeoutPublisherTest.java
Show resolved
Hide resolved
...ncurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/TimeoutPublisherTest.java
Outdated
Show resolved
Hide resolved
Motivation: The existing Publisher.idleTimeout(Duration) operator resets the timer for each received item. In some cases the entire operation must be completed with a specific amount of time. Modifications: Add a new Publisher operator, withTimeout(Duration, that terminates the Publisher with a TimeoutException if it fails to otherwise terminate before a specified time duration. Result: A new timeout option is available.
74ad853
to
0e41826
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add PublisherTimeoutTerminalTckTest
, similar to PublisherTimeoutTckTest
.
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TimeoutPublisher.java
Outdated
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TimeoutPublisher.java
Outdated
Show resolved
Hide resolved
...ncurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/TimeoutPublisherTest.java
Outdated
Show resolved
Hide resolved
* @param duration The duration to convert | ||
* @return The converted nanoseconds value. | ||
*/ | ||
private long toNanos(Duration duration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: time utility state/methods can be static and likely moved into another class (publisher has many methods so we try to keep it focused on operators).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I made this change because I notice the Java 11 has it built-in and looking at the implementation realized that overflow handling was needed.
Motivation:
The existing
Publisher.timeout(Duration)
operator restarts the timer for each received item and will only timeout if no items are emitted during the timeout duration. In some cases the total time for all items to be emitted is of more interest. An operator to timeout based on the entire elapsed time since subscribe is needed.Modifications:
Add a new Publisher operator,
timeoutTerminal(Duration)
, that terminates the Publisher with a TimeoutException if the Publisher does not terminate before the specified time duration.Result:
A new timeout option is available.