Skip to content

Commit

Permalink
Add Publisher.timeoutTerminal(Duration) operator (#1445)
Browse files Browse the repository at this point in the history
Motivation:
The existing `Publisher.timeout(Duration)` operator restarts the timer for each received item and will only timeout if no items are emitted during the timeout duration. In some cases the total time for all items to be emitted is of more interest. An operator to timeout based on the entire elapsed time since subscribe is needed.

Modifications:
Add a new Publisher operator, `timeoutTerminal(Duration)`, that terminates the Publisher with a `TimeoutException` if the Publisher does not terminate before the specified time duration.

Result:
A new timeout option is available.
  • Loading branch information
bondolo authored Mar 23, 2021
1 parent 35b4293 commit c63d265
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@
public abstract class Publisher<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(Publisher.class);

/**
* Maximum positive duration which can be expressed as a signed 64-bit number of nanoseconds.
*/
private static final Duration LONG_MAX_NANOS = Duration.ofNanos(Long.MAX_VALUE);
/**
* Maximum negative duration which can be expressed as a signed 64-bit number of nanoseconds.
*/
private static final Duration LONG_MIN_NANOS = Duration.ofNanos(Long.MIN_VALUE);

private final Executor executor;
private final boolean shareContextOnSubscribe;

Expand Down Expand Up @@ -1513,7 +1522,7 @@ public final Publisher<T> whenCancel(Runnable onCancel) {
*/
@Deprecated
public final Publisher<T> idleTimeout(long duration, TimeUnit unit) {
return new TimeoutPublisher<>(this, executor, duration, unit);
return timeout(duration, unit);
}

/**
Expand All @@ -1533,7 +1542,7 @@ public final Publisher<T> idleTimeout(long duration, TimeUnit unit) {
*/
@Deprecated
public final Publisher<T> idleTimeout(Duration duration) {
return new TimeoutPublisher<>(this, executor, duration);
return timeout(duration);
}

/**
Expand All @@ -1555,7 +1564,7 @@ public final Publisher<T> idleTimeout(Duration duration) {
@Deprecated
public final Publisher<T> idleTimeout(long duration, TimeUnit unit,
io.servicetalk.concurrent.Executor timeoutExecutor) {
return new TimeoutPublisher<>(this, executor, duration, unit, timeoutExecutor);
return timeout(duration, unit, timeoutExecutor);
}

/**
Expand All @@ -1575,7 +1584,7 @@ public final Publisher<T> idleTimeout(long duration, TimeUnit unit,
*/
@Deprecated
public final Publisher<T> idleTimeout(Duration duration, io.servicetalk.concurrent.Executor timeoutExecutor) {
return new TimeoutPublisher<>(this, executor, duration, timeoutExecutor);
return timeout(duration, timeoutExecutor);
}

/**
Expand All @@ -1594,7 +1603,7 @@ public final Publisher<T> idleTimeout(Duration duration, io.servicetalk.concurre
* @see #timeout(long, TimeUnit, io.servicetalk.concurrent.Executor)
*/
public final Publisher<T> timeout(long duration, TimeUnit unit) {
return new TimeoutPublisher<>(this, executor, duration, unit);
return timeout(duration, unit, executor);
}

/**
Expand All @@ -1612,7 +1621,7 @@ public final Publisher<T> timeout(long duration, TimeUnit unit) {
* @see #timeout(long, TimeUnit, io.servicetalk.concurrent.Executor)
*/
public final Publisher<T> timeout(Duration duration) {
return new TimeoutPublisher<>(this, executor, duration);
return timeout(duration, executor);
}

/**
Expand All @@ -1632,7 +1641,20 @@ public final Publisher<T> timeout(Duration duration) {
*/
public final Publisher<T> timeout(long duration, TimeUnit unit,
io.servicetalk.concurrent.Executor timeoutExecutor) {
return new TimeoutPublisher<>(this, executor, duration, unit, timeoutExecutor);
return new TimeoutPublisher<>(this, executor, duration, unit, true, timeoutExecutor);
}

/**
* Converts a {@code Duration} to nanoseconds or if the resulting value would overflow a 64-bit signed integer then
* either {@code Long.MIN_VALUE} or {@code Long.MAX_VALUE} as appropriate.
*
* @param duration The duration to convert
* @return The converted nanoseconds value.
*/
private long toNanos(Duration duration) {
return duration.compareTo(LONG_MAX_NANOS) < 0 ?
duration.compareTo(LONG_MIN_NANOS) > 0 ? duration.toNanos() : Long.MIN_VALUE
: Long.MAX_VALUE;
}

/**
Expand All @@ -1650,7 +1672,80 @@ public final Publisher<T> timeout(long duration, TimeUnit unit,
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX timeout operator.</a>
*/
public final Publisher<T> timeout(Duration duration, io.servicetalk.concurrent.Executor timeoutExecutor) {
return new TimeoutPublisher<>(this, executor, duration, timeoutExecutor);
return timeout(toNanos(duration), TimeUnit.NANOSECONDS, timeoutExecutor);
}

/**
* Creates a new {@link Publisher} that will mimic the signals of this {@link Publisher} but will terminate with a
* {@link TimeoutException} if time {@code duration} elapses between subscribe and termination. The timer starts
* when the returned {@link Publisher} is subscribed.
* <p>
* In the event of timeout any {@link Subscription} from
* {@link Subscriber#onSubscribe(PublisherSource.Subscription)} will be {@link Subscription#cancel() cancelled} and
* the associated {@link Subscriber} will be {@link Subscriber#onError(Throwable) terminated}.
* @param duration The time duration during which the Publisher must complete.
* @return a new {@link Publisher} that will mimic the signals of this {@link Publisher} but will terminate with a
* {@link TimeoutException} if time {@code duration} elapses between subscribe and termination.
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX timeout operator.</a>
*/
public final Publisher<T> timeoutTerminal(Duration duration) {
return timeoutTerminal(duration, executor);
}

/**
* Creates a new {@link Publisher} that will mimic the signals of this {@link Publisher} but will terminate with a
* {@link TimeoutException} if time {@code duration} elapses between subscribe and termination. The timer starts
* when the returned {@link Publisher} is subscribed.
* <p>
* In the event of timeout any {@link Subscription} from
* {@link Subscriber#onSubscribe(PublisherSource.Subscription)} will be {@link Subscription#cancel() cancelled} and
* the associated {@link Subscriber} will be {@link Subscriber#onError(Throwable) terminated}.
* @param duration The time duration during which the Publisher must complete.
* @param timeoutExecutor The {@link Executor} to use for managing the timer notifications.
* @return a new {@link Publisher} that will mimic the signals of this {@link Publisher} but will terminate with a
* {@link TimeoutException} if time {@code duration} elapses between subscribe and termination.
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX timeout operator.</a>
*/
public final Publisher<T> timeoutTerminal(Duration duration, io.servicetalk.concurrent.Executor timeoutExecutor) {
return timeoutTerminal(toNanos(duration), TimeUnit.NANOSECONDS, timeoutExecutor);
}

/**
* Creates a new {@link Publisher} that will mimic the signals of this {@link Publisher} but will terminate with a
* {@link TimeoutException} if time {@code duration} elapses between subscribe and termination. The timer starts
* when the returned {@link Publisher} is subscribed.
* <p>
* In the event of timeout any {@link Subscription} from
* {@link Subscriber#onSubscribe(PublisherSource.Subscription)} will be {@link Subscription#cancel() cancelled} and
* the associated {@link Subscriber} will be {@link Subscriber#onError(Throwable) terminated}.
* @param duration The time duration during which the Publisher must complete.
* @param unit The units for {@code duration}.
* @return a new {@link Publisher} that will mimic the signals of this {@link Publisher} but will terminate with a
* {@link TimeoutException} if time {@code duration} elapses between subscribe and termination.
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX timeout operator.</a>
*/
public final Publisher<T> timeoutTerminal(long duration, TimeUnit unit) {
return timeoutTerminal(duration, unit, executor);
}

/**
* Creates a new {@link Publisher} that will mimic the signals of this {@link Publisher} but will terminate with a
* {@link TimeoutException} if time {@code duration} elapses between subscribe and termination. The timer starts
* when the returned {@link Publisher} is subscribed.
* <p>
* In the event of timeout any {@link Subscription} from
* {@link Subscriber#onSubscribe(PublisherSource.Subscription)} will be {@link Subscription#cancel() cancelled} and
* the associated {@link Subscriber} will be {@link Subscriber#onError(Throwable) terminated}.
* @param duration The time duration during which the Publisher must complete.
* @param unit The units for {@code duration}.
* @param timeoutExecutor The {@link Executor} to use for managing the timer notifications.
* @return a new {@link Publisher} that will mimic the signals of this {@link Publisher} but will terminate with a
* {@link TimeoutException} if time {@code duration} elapses between subscribe and termination.
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX timeout operator.</a>
*/
public final Publisher<T> timeoutTerminal(long duration, TimeUnit unit,
io.servicetalk.concurrent.Executor timeoutExecutor) {
return new TimeoutPublisher<>(this, executor, duration, unit, false, timeoutExecutor);
}

/**
Expand Down
Loading

0 comments on commit c63d265

Please sign in to comment.