Skip to content

Commit

Permalink
HttpTimeoutFilter include original timeout on message body timeout (#…
Browse files Browse the repository at this point in the history
…1494)

Motivation:
AbstractTimeoutHttpFilter can now apply a timeout on the metadata and
the message body level. If the message body level timeout triggers the
interval is taken as the user will see a value which is different than
the originally supplied timeout duration.

Modifications:
- Add a MappedTimeoutException that can wrap a TimeoutException and
  allows for specifying the original timeout value
- Remove pre-check conditions on timeout duration which is enforced by
  the timeout operators.

Result:
HttpTimeoutFilter exception includes the original timeout duration.
  • Loading branch information
Scottmitch authored Apr 15, 2021
1 parent 925142b commit fb8af44
Showing 1 changed file with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Publisher.defer;
import static io.servicetalk.concurrent.api.Publisher.failed;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;
import static io.servicetalk.utils.internal.DurationUtils.isPositive;
import static java.time.Duration.ofNanos;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -73,10 +71,6 @@ final Single<StreamingHttpResponse> withTimeout(final StreamingHttpRequest reque

return Single.defer(() -> {
final Duration timeout = timeoutForRequest.apply(request);
if (null != timeout && !isPositive(timeout)) {
return Single.failed(new TimeoutException("non-positive timeout of " + timeout.toMillis() + "ms"));
}

Single<StreamingHttpResponse> response = responseFunction.apply(request);
if (null != timeout) {
final Single<StreamingHttpResponse> timeoutResponse = timeoutExecutor == null ?
Expand All @@ -86,12 +80,12 @@ final Single<StreamingHttpResponse> withTimeout(final StreamingHttpRequest reque
final long deadline = System.nanoTime() + timeout.toNanos();
response = timeoutResponse.map(resp -> resp.transformMessageBody(body -> defer(() -> {
final Duration remaining = ofNanos(deadline - System.nanoTime());
if (isPositive(remaining)) {
return (timeoutExecutor == null ?
body.timeoutTerminal(remaining) : body.timeoutTerminal(remaining, timeoutExecutor))
.subscribeShareContext();
}
return failed(new TimeoutException("timeout after " + timeout.toMillis() + "ms"));
return (timeoutExecutor == null ?
body.timeoutTerminal(remaining) : body.timeoutTerminal(remaining, timeoutExecutor))
.onErrorMap(TimeoutException.class, t ->
new MappedTimeoutException("message body timeout after " + timeout.toMillis() +
"ms", t))
.subscribeShareContext();
})));
} else {
response = timeoutResponse;
Expand All @@ -102,6 +96,21 @@ final Single<StreamingHttpResponse> withTimeout(final StreamingHttpRequest reque
});
}

private static final class MappedTimeoutException extends TimeoutException {
private static final long serialVersionUID = -8230476062001221272L;

MappedTimeoutException(String message, Throwable cause) {
super(message);
initCause(cause);
}

@Override
public Throwable fillInStackTrace() {
// This is a wrapping exception class that always has an original cause and does not require stack trace.
return this;
}
}

/**
* {@link TimeoutFromRequest} implementation which returns the provided default duration as the timeout duration to
* be used for any request.
Expand Down

0 comments on commit fb8af44

Please sign in to comment.