From 5b0100bc5d650bd9bbee8b6b9a405bcf0bd5ab29 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Thu, 15 Apr 2021 12:06:31 -0700 Subject: [PATCH] Fix duration comparison bug in timeout filters and reduce code duplication (#1491) Motivation: `Duration#compareTo` is hard to understand, the result of comparison depends on the order of arguments. This lead to a bug in `TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter` which always fails payload body publisher with `TimeoutException`. Use a shared utility to make `isPositive` check for `Duration` consistent across all modules. Also, `TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter` logic is identical, we can reduce duplication and share the logic. Modifications: - Introduce a shared `DurationUtils` class for common operations with `Duration`; - Use this utility to reduce mistakes when we check that the passed `Duration` must be positive; - Introduce `AbstractTimeoutHttpFilterTest` to share timeout logic between `TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter`; - Add tests to verify behavior correctness for both timeout filters; - Adjust how the timeout filters are created in `ResponseTimeoutTest` to avoid NPE; - Minor improvements to make javadoc consistent between `TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter`; Result: 1. `TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter` do not fail payload body publisher before the actual timeout expires. 2. No code duplication between timeout filters. 3. Shared utility to consistently validate `Duration` in all modules. --- .../grpc/api/DefaultGrpcClientMetadata.java | 5 +- servicetalk-grpc-netty/build.gradle | 1 + .../grpc/netty/DefaultGrpcClientBuilder.java | 9 +- .../grpc/netty/DefaultGrpcServerBuilder.java | 9 +- .../http/netty/H2ResponseCancelTest.java | 5 +- .../http/netty/ResponseTimeoutTest.java | 16 +- servicetalk-http-utils/build.gradle | 4 + .../http/utils/AbstractTimeoutHttpFilter.java | 128 ++++++++++++ .../utils/TimeoutHttpRequesterFilter.java | 136 +++---------- .../http/utils/TimeoutHttpServiceFilter.java | 108 +++------- .../utils/AbstractTimeoutHttpFilterTest.java | 184 ++++++++++++++++++ .../utils/TimeoutHttpRequesterFilterTest.java | 58 ++++++ .../utils/TimeoutHttpServiceFilterTest.java | 60 ++++++ .../utils/internal/DurationUtils.java | 57 ++++++ .../utils/internal/DurationUtilsTest.java | 53 +++++ 15 files changed, 608 insertions(+), 225 deletions(-) create mode 100644 servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilter.java create mode 100644 servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilterTest.java create mode 100644 servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilterTest.java create mode 100644 servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpServiceFilterTest.java create mode 100644 servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/DurationUtils.java create mode 100644 servicetalk-utils-internal/src/test/java/io/servicetalk/utils/internal/DurationUtilsTest.java diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientMetadata.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientMetadata.java index 14bd4034e8..3d5faa1539 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientMetadata.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientMetadata.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import static io.servicetalk.encoding.api.Identity.identity; +import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; /** * Default implementation for {@link DefaultGrpcClientMetadata}. @@ -149,8 +150,8 @@ protected DefaultGrpcClientMetadata(final String path, super(path); this.strategy = strategy; this.requestEncoding = Objects.requireNonNull(requestEncoding, "requestEncoding"); - if (null != timeout && Duration.ZERO.compareTo(timeout) >= 0) { - throw new IllegalArgumentException("timeout: " + timeout + " (expected > 0)"); + if (null != timeout) { + ensurePositive(timeout, "timeout"); } this.timeout = null != timeout && timeout.compareTo(GRPC_MAX_TIMEOUT) <= 0 ? timeout : null; } diff --git a/servicetalk-grpc-netty/build.gradle b/servicetalk-grpc-netty/build.gradle index e187dba6b6..86df489a4c 100644 --- a/servicetalk-grpc-netty/build.gradle +++ b/servicetalk-grpc-netty/build.gradle @@ -30,6 +30,7 @@ dependencies { implementation project(":servicetalk-http-utils") implementation project(":servicetalk-annotations") implementation project(":servicetalk-transport-netty-internal") + implementation project(":servicetalk-utils-internal") implementation "org.slf4j:slf4j-api:$slf4jVersion" implementation "com.google.code.findbugs:jsr305:$jsr305Version" diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcClientBuilder.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcClientBuilder.java index 0ccc93f351..e59ebccdac 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcClientBuilder.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcClientBuilder.java @@ -43,7 +43,6 @@ import java.net.SocketOption; import java.time.Duration; import java.time.temporal.ChronoUnit; -import java.util.Objects; import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.LongFunction; @@ -53,6 +52,7 @@ import static io.servicetalk.buffer.api.CharSequences.newAsciiString; import static io.servicetalk.grpc.api.GrpcClientMetadata.GRPC_MAX_TIMEOUT; import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default; +import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; final class DefaultGrpcClientBuilder extends GrpcClientBuilder { /** @@ -98,12 +98,7 @@ public GrpcClientBuilder defaultTimeout(Duration defaultTimeout) { if (invokedBuild) { throw new IllegalStateException("default timeout cannot be modified after build, create a new builder"); } - - if (Duration.ZERO.compareTo(Objects.requireNonNull(defaultTimeout, "defaultTimeout")) >= 0) { - throw new IllegalArgumentException("defaultTimeout: " + defaultTimeout + " (expected > 0)"); - } - - this.defaultTimeout = defaultTimeout; + this.defaultTimeout = ensurePositive(defaultTimeout, "defaultTimeout"); return this; } diff --git a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java index a32e1d5038..5ec4c8afe5 100644 --- a/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java +++ b/servicetalk-grpc-netty/src/main/java/io/servicetalk/grpc/netty/DefaultGrpcServerBuilder.java @@ -50,7 +50,6 @@ import java.net.SocketOption; import java.time.Duration; import java.util.Map; -import java.util.Objects; import java.util.function.BooleanSupplier; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -58,6 +57,7 @@ import static io.servicetalk.grpc.api.GrpcClientMetadata.GRPC_MAX_TIMEOUT; import static io.servicetalk.grpc.api.GrpcExecutionStrategies.defaultStrategy; import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default; +import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; final class DefaultGrpcServerBuilder extends GrpcServerBuilder implements ServerBinder { @@ -85,12 +85,7 @@ public GrpcServerBuilder defaultTimeout(Duration defaultTimeout) { if (invokedBuild) { throw new IllegalStateException("default timeout cannot be modified after build, create a new builder"); } - - if (Duration.ZERO.compareTo(Objects.requireNonNull(defaultTimeout, "defaultTimeout")) >= 0) { - throw new IllegalArgumentException("defaultTimeout: " + defaultTimeout + " (expected > 0)"); - } - - this.defaultTimeout = defaultTimeout; + this.defaultTimeout = ensurePositive(defaultTimeout, "defaultTimeout"); return this; } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2ResponseCancelTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2ResponseCancelTest.java index 78c986c042..2cca91ce6f 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2ResponseCancelTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/H2ResponseCancelTest.java @@ -168,8 +168,9 @@ private void requestCancellationResetsStreamButNotParentConnection(StreamingHttp // ignore } }) - .expectError() - // .thenCancel() + // FIXME: use thenCancel() after await() instead of cancelling from inside then(...) + expectError() + // https://github.com/apple/servicetalk/issues/1492 + .expectError(IllegalStateException.class) // should never happen .verify(); assertThat("Unexpected responses", responses, is(empty())); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ResponseTimeoutTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ResponseTimeoutTest.java index 13a30b04aa..6bf2f168d9 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ResponseTimeoutTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ResponseTimeoutTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ import io.servicetalk.client.api.ConnectionFactory; import io.servicetalk.client.api.DelegatingConnectionFactory; import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.SingleSource.Processor; import io.servicetalk.concurrent.SingleSource.Subscriber; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; @@ -59,7 +58,6 @@ import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable; -import static io.servicetalk.concurrent.api.Processors.newSingleProcessor; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; @@ -84,7 +82,7 @@ public class ResponseTimeoutTest { @Rule public final Timeout timeout = new ServiceTalkTestTimeout(); - private final BlockingQueue> serverResponses = new LinkedBlockingQueue<>(); + private final BlockingQueue> serverResponses = new LinkedBlockingQueue<>(); private final BlockingQueue delayedClientCancels = new LinkedBlockingQueue<>(); private final BlockingQueue delayedClientTermination = new LinkedBlockingQueue<>(); private final ServerContext ctx; @@ -96,11 +94,11 @@ public ResponseTimeoutTest(Duration clientTimeout, Duration serverTimeout, Class expectThrowableClazz) throws Exception { ctx = forAddress(localAddress(0)) - .appendServiceFilter(new TimeoutHttpServiceFilter(serverTimeout, true)) + .appendServiceFilter(new TimeoutHttpServiceFilter(__ -> serverTimeout, true)) .listenAndAwait((__, ___, factory) -> { - Processor resp = newSingleProcessor(); + Single resp = Single.never(); serverResponses.add(resp); - return Single.never(); + return resp; }); client = forSingleAddress(serverHostAndPort(ctx)) .appendClientFilter(client -> new StreamingHttpClientFilter(client) { @@ -109,7 +107,7 @@ protected Single request(final StreamingHttpRequester del final HttpExecutionStrategy strategy, final StreamingHttpRequest request) { return Single.succeeded(null) - .afterOnSubscribe(cancellable -> delayedClientCancels.add(cancellable)) + .afterOnSubscribe(delayedClientCancels::add) .concat(delegate().request(strategy, request) .liftSync(target -> new Subscriber() { @Override @@ -141,7 +139,7 @@ public void onError(final Throwable t) { } }) .appendConnectionFactoryFilter(original -> new CountingConnectionFactory(original, connectionCount)) - .appendClientFilter(new TimeoutHttpRequesterFilter(clientTimeout, true)) + .appendClientFilter(new TimeoutHttpRequesterFilter(__ -> clientTimeout, true)) .build(); this.expectThrowableClazz = expectThrowableClazz; } diff --git a/servicetalk-http-utils/build.gradle b/servicetalk-http-utils/build.gradle index a4fa7f17d8..aa8ec0a429 100644 --- a/servicetalk-http-utils/build.gradle +++ b/servicetalk-http-utils/build.gradle @@ -23,6 +23,7 @@ dependencies { implementation project(":servicetalk-annotations") implementation project(":servicetalk-concurrent-api-internal") implementation project(":servicetalk-concurrent-internal") + implementation project(":servicetalk-utils-internal") implementation "com.google.code.findbugs:jsr305:$jsr305Version" implementation "org.slf4j:slf4j-api:$slf4jVersion" @@ -30,8 +31,11 @@ dependencies { testImplementation testFixtures(project(":servicetalk-concurrent-internal")) testImplementation testFixtures(project(":servicetalk-http-api")) testImplementation project(":servicetalk-buffer-netty") + testImplementation project(":servicetalk-concurrent-api-test") testImplementation project(":servicetalk-test-resources") testImplementation "junit:junit:$junitVersion" + testImplementation "org.junit.jupiter:junit-jupiter-api:$junit5Version" + testImplementation "org.junit.jupiter:junit-jupiter-params:$junit5Version" testImplementation "org.hamcrest:hamcrest-library:$hamcrestVersion" testImplementation "org.mockito:mockito-core:$mockitoCoreVersion" } diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilter.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilter.java new file mode 100644 index 0000000000..841b2e4051 --- /dev/null +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilter.java @@ -0,0 +1,128 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.utils; + +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.http.api.HttpExecutionStrategy; +import io.servicetalk.http.api.HttpExecutionStrategyInfluencer; +import io.servicetalk.http.api.HttpRequestMetaData; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpResponse; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.Publisher.defer; +import static io.servicetalk.concurrent.api.Publisher.failed; +import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; +import static io.servicetalk.utils.internal.DurationUtils.isPositive; +import static java.time.Duration.ofNanos; +import static java.util.Objects.requireNonNull; + +abstract class AbstractTimeoutHttpFilter implements HttpExecutionStrategyInfluencer { + /** + * Establishes the timeout for a given request. + */ + private final TimeoutFromRequest timeoutForRequest; + + /** + * If {@code true} then timeout is for full request/response transaction otherwise only the response metadata must + * complete before the timeout. + */ + private final boolean fullRequestResponse; + + @Nullable + private final Executor timeoutExecutor; + + AbstractTimeoutHttpFilter(final TimeoutFromRequest timeoutForRequest, final boolean fullRequestResponse) { + this.timeoutForRequest = requireNonNull(timeoutForRequest, "timeoutForRequest"); + this.fullRequestResponse = fullRequestResponse; + this.timeoutExecutor = null; + } + + AbstractTimeoutHttpFilter(final TimeoutFromRequest timeoutForRequest, final boolean fullRequestResponse, + final Executor timeoutExecutor) { + this.timeoutForRequest = requireNonNull(timeoutForRequest, "timeoutForRequest"); + this.fullRequestResponse = fullRequestResponse; + this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor"); + } + + @Override + public final HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) { + return timeoutForRequest.influenceStrategy(strategy); + } + + final Single withTimeout(final StreamingHttpRequest request, + final Function> responseFunction) { + + return Single.defer(() -> { + final Duration timeout = timeoutForRequest.apply(request); + if (null != timeout && !isPositive(timeout)) { + return Single.failed(new TimeoutException("non-positive timeout of " + timeout.toMillis() + "ms")); + } + + Single response = responseFunction.apply(request); + if (null != timeout) { + final Single timeoutResponse = timeoutExecutor == null ? + response.timeout(timeout) : response.timeout(timeout, timeoutExecutor); + + if (fullRequestResponse) { + final long deadline = System.nanoTime() + timeout.toNanos(); + response = timeoutResponse.map(resp -> resp.transformMessageBody(body -> defer(() -> { + final Duration remaining = ofNanos(deadline - System.nanoTime()); + if (isPositive(remaining)) { + return (timeoutExecutor == null ? + body.timeoutTerminal(remaining) : body.timeoutTerminal(remaining, timeoutExecutor)) + .subscribeShareContext(); + } + return failed(new TimeoutException("timeout after " + timeout.toMillis() + "ms")); + }))); + } else { + response = timeoutResponse; + } + } + + return response.subscribeShareContext(); + }); + } + + /** + * {@link TimeoutFromRequest} implementation which returns the provided default duration as the timeout duration to + * be used for any request. + */ + static final class FixedDuration implements TimeoutFromRequest { + + private final Duration duration; + + FixedDuration(final Duration duration) { + this.duration = ensurePositive(duration, "duration"); + } + + @Override + public Duration apply(final HttpRequestMetaData request) { + return duration; + } + + @Override + public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) { + // No influence since we do not block. + return strategy; + } + } +} diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilter.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilter.java index 1819fb785b..19c0b47d2a 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilter.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilter.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,10 @@ package io.servicetalk.http.utils; import io.servicetalk.concurrent.api.Executor; -import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; import io.servicetalk.http.api.FilterableStreamingHttpClient; import io.servicetalk.http.api.FilterableStreamingHttpConnection; import io.servicetalk.http.api.HttpExecutionStrategy; -import io.servicetalk.http.api.HttpExecutionStrategyInfluencer; -import io.servicetalk.http.api.HttpRequestMetaData; import io.servicetalk.http.api.StreamingHttpClientFilter; import io.servicetalk.http.api.StreamingHttpClientFilterFactory; import io.servicetalk.http.api.StreamingHttpConnectionFilter; @@ -32,33 +29,19 @@ import io.servicetalk.http.api.StreamingHttpResponse; import java.time.Duration; -import java.util.Objects; -import java.util.concurrent.TimeoutException; -import javax.annotation.Nullable; /** - * A filter to enable timeouts for HTTP requests. The timeout applies either the response metadata (headers) completion - * or the complete reception of the response payload body and optional trailers. + * A filter to enable timeouts for HTTP requests on the client-side. + * + *

The timeout applies either the response metadata (headers) completion or the complete reception of the response + * payload body and optional trailers. * *

The order with which this filter is applied may be highly significant. For example, appending it before a retry * filter would have different results than applying it after the retry filter; timeout would apply for all retries vs * timeout per retry. */ -public final class TimeoutHttpRequesterFilter implements StreamingHttpClientFilterFactory, - StreamingHttpConnectionFilterFactory, - HttpExecutionStrategyInfluencer { - - /** - * Establishes the timeout for a given request - */ - private final TimeoutFromRequest timeoutForRequest; - /** - * If true then timeout is for full request/response transaction otherwise only the response metadata must complete - * before the timeout. - */ - private final boolean fullRequestResponse; - @Nullable - private final Executor timeoutExecutor; +public final class TimeoutHttpRequesterFilter extends AbstractTimeoutHttpFilter + implements StreamingHttpClientFilterFactory, StreamingHttpConnectionFilterFactory { /** * Creates a new instance which requires only that the response metadata be received before the timeout. @@ -66,7 +49,7 @@ public final class TimeoutHttpRequesterFilter implements StreamingHttpClientFilt * @param duration the timeout {@link Duration} */ public TimeoutHttpRequesterFilter(final Duration duration) { - this(simpleDurationTimeout(duration), false); + this(new FixedDuration(duration), false); } /** @@ -76,63 +59,59 @@ public TimeoutHttpRequesterFilter(final Duration duration) { * @param timeoutExecutor the {@link Executor} to use for managing the timer notifications */ public TimeoutHttpRequesterFilter(final Duration duration, final Executor timeoutExecutor) { - this(simpleDurationTimeout(duration), false, timeoutExecutor); + this(new FixedDuration(duration), false, timeoutExecutor); } /** * Creates a new instance. * * @param duration the timeout {@link Duration} - * @param fullRequestResponse if true then timeout is for full request/response transaction otherwise only the - * response metadata must arrive before the timeout. + * @param fullRequestResponse if {@code true} then timeout is for full request/response transaction otherwise only + * the response metadata must arrive before the timeout */ public TimeoutHttpRequesterFilter(final Duration duration, final boolean fullRequestResponse) { - this(simpleDurationTimeout(duration), fullRequestResponse); + this(new FixedDuration(duration), fullRequestResponse); } /** * Creates a new instance. * * @param duration the timeout {@link Duration} - * @param fullRequestResponse if true then timeout is for full request/response transaction otherwise only the - * response metadata must arrive before the timeout. + * @param fullRequestResponse if {@code true} then timeout is for full request/response transaction otherwise only + * the response metadata must arrive before the timeout * @param timeoutExecutor the {@link Executor} to use for managing the timer notifications */ public TimeoutHttpRequesterFilter(final Duration duration, final boolean fullRequestResponse, final Executor timeoutExecutor) { - this(simpleDurationTimeout(duration), fullRequestResponse, timeoutExecutor); + this(new FixedDuration(duration), fullRequestResponse, timeoutExecutor); } /** * Creates a new instance. * * @param timeoutForRequest function for extracting timeout from request which may also determine the timeout using - * other sources. If no timeout is to be applied then the function should return null. - * @param fullRequestResponse if true then timeout is for full request/response transaction otherwise only the - * response metadata must arrive before the timeout. + * other sources. If no timeout is to be applied then the function should return {@code null} + * @param fullRequestResponse if {@code true} then timeout is for full request/response transaction otherwise only + * the response metadata must arrive before the timeout */ public TimeoutHttpRequesterFilter(final TimeoutFromRequest timeoutForRequest, final boolean fullRequestResponse) { - this.timeoutForRequest = Objects.requireNonNull(timeoutForRequest, "timeoutForRequest"); - this.fullRequestResponse = fullRequestResponse; - this.timeoutExecutor = null; + super(timeoutForRequest, fullRequestResponse); } /** * Creates a new instance. * * @param timeoutForRequest function for extracting timeout from request which may also determine the timeout using - * other sources. If no timeout is to be applied then the function should return null. + * other sources. If no timeout is to be applied then the function should return {@code null} * @param fullRequestResponse if {@code true} then timeout is for full request/response transaction otherwise only - * the response metadata must arrive before the timeout. + * the response metadata must arrive before the timeout * @param timeoutExecutor the {@link Executor} to use for managing the timer notifications */ public TimeoutHttpRequesterFilter(final TimeoutFromRequest timeoutForRequest, final boolean fullRequestResponse, final Executor timeoutExecutor) { - this.timeoutForRequest = Objects.requireNonNull(timeoutForRequest, "timeoutForRequest"); - this.fullRequestResponse = fullRequestResponse; - this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor, "timeoutExecutor"); + super(timeoutForRequest, fullRequestResponse, timeoutExecutor); } @Override @@ -142,7 +121,7 @@ public StreamingHttpClientFilter create(final FilterableStreamingHttpClient clie protected Single request(final StreamingHttpRequester delegate, final HttpExecutionStrategy strategy, final StreamingHttpRequest request) { - return TimeoutHttpRequesterFilter.this.request(delegate, strategy, request); + return TimeoutHttpRequesterFilter.this.withTimeout(request, r -> delegate.request(strategy, r)); } }; } @@ -153,74 +132,7 @@ public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnect @Override public Single request(final HttpExecutionStrategy strategy, final StreamingHttpRequest request) { - return TimeoutHttpRequesterFilter.this.request(delegate(), strategy, request); - } - }; - } - - @Override - public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) { - return timeoutForRequest.influenceStrategy(strategy); - } - - private Single request(final StreamingHttpRequester delegate, - final HttpExecutionStrategy strategy, - final StreamingHttpRequest request) { - return Single.defer(() -> { - Duration timeout = timeoutForRequest.apply(request); - Single response; - if (null != timeout && Duration.ZERO.compareTo(timeout) >= 0) { - response = Single.failed(new TimeoutException("negative timeout of " + timeout.toMillis() + "ms")); - } else { - response = delegate.request(strategy, request); - - if (null != timeout) { - Single timeoutResponse = timeoutExecutor == null ? - response.timeout(timeout) : response.timeout(timeout, timeoutExecutor); - - if (fullRequestResponse) { - long deadline = System.nanoTime() + timeout.toNanos(); - response = timeoutResponse.map(resp -> resp.transformMessageBody(body -> - Publisher.defer(() -> { - Duration remaining = Duration.ofNanos(deadline - System.nanoTime()); - return (Duration.ZERO.compareTo(remaining) <= 0 ? - Publisher.failed( - new TimeoutException("timeout after " + timeout.toMillis() + "ms")) - : (null != timeoutExecutor ? - body.timeoutTerminal(remaining, timeoutExecutor) - : body.timeoutTerminal(remaining)) - ).subscribeShareContext(); - }))); - } else { - response = timeoutResponse; - } - } - } - - return response.subscribeShareContext(); - }); - } - - /** - * Returns a function which returns the provided default duration as the timeout duration to be used for any - * request. - * - * @param duration timeout duration or null for no timeout - * @return a function to produce a timeout using specified duration - */ - static TimeoutFromRequest simpleDurationTimeout(@Nullable Duration duration) { - return new TimeoutFromRequest() { - @Nullable - @Override - public Duration apply(final HttpRequestMetaData request) { - // the request is not considered - return duration; - } - - @Override - public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) { - // No influence since we do not block. - return strategy; + return TimeoutHttpRequesterFilter.this.withTimeout(request, r -> delegate().request(strategy, r)); } }; } diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpServiceFilter.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpServiceFilter.java index 8ce227f312..b1b417bbfe 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpServiceFilter.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/TimeoutHttpServiceFilter.java @@ -16,10 +16,7 @@ package io.servicetalk.http.utils; import io.servicetalk.concurrent.api.Executor; -import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; -import io.servicetalk.http.api.HttpExecutionStrategy; -import io.servicetalk.http.api.HttpExecutionStrategyInfluencer; import io.servicetalk.http.api.HttpServiceContext; import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpResponse; @@ -29,33 +26,19 @@ import io.servicetalk.http.api.StreamingHttpServiceFilterFactory; import java.time.Duration; -import java.util.Objects; -import java.util.concurrent.TimeoutException; -import javax.annotation.Nullable; - -import static io.servicetalk.http.utils.TimeoutHttpRequesterFilter.simpleDurationTimeout; /** - * A {@link StreamingHttpServiceFilter} that adds support for request/response timeouts. + * A filter to enable timeouts for HTTP requests on the server-side. + * + *

The timeout applies either the response metadata (headers) completion or the complete reception of the response + * payload body and optional trailers. * *

The order with which this filter is applied may be highly significant. For example, appending it before a retry * filter would have different results than applying it after the retry filter; timeout would apply for all retries vs * timeout per retry. */ -public final class TimeoutHttpServiceFilter - implements StreamingHttpServiceFilterFactory, HttpExecutionStrategyInfluencer { - - /** - * Establishes the timeout for a given request - */ - private final TimeoutFromRequest timeoutForRequest; - /** - * If true then timeout is for full request/response transaction otherwise only the response metadata must complete - * before the timeout. - */ - private final boolean fullRequestResponse; - @Nullable - private final Executor timeoutExecutor; +public final class TimeoutHttpServiceFilter extends AbstractTimeoutHttpFilter + implements StreamingHttpServiceFilterFactory { /** * Creates a new instance which requires only that the response metadata be received before the timeout. @@ -63,7 +46,7 @@ public final class TimeoutHttpServiceFilter * @param duration the timeout {@link Duration} */ public TimeoutHttpServiceFilter(Duration duration) { - this(simpleDurationTimeout(duration), false); + this(new FixedDuration(duration), false); } /** @@ -73,7 +56,7 @@ public TimeoutHttpServiceFilter(Duration duration) { * @param timeoutExecutor the {@link Executor} to use for managing the timer notifications */ public TimeoutHttpServiceFilter(Duration duration, Executor timeoutExecutor) { - this(simpleDurationTimeout(duration), false, timeoutExecutor); + this(new FixedDuration(duration), false, timeoutExecutor); } /** @@ -81,10 +64,10 @@ public TimeoutHttpServiceFilter(Duration duration, Executor timeoutExecutor) { * * @param duration the timeout {@link Duration} * @param fullRequestResponse if {@code true} then timeout is for full request/response transaction otherwise only - * the response metadata must complete before the timeout. + * the response metadata must arrive before the timeout */ public TimeoutHttpServiceFilter(Duration duration, boolean fullRequestResponse) { - this(simpleDurationTimeout(duration), fullRequestResponse); + this(new FixedDuration(duration), fullRequestResponse); } /** @@ -92,42 +75,38 @@ public TimeoutHttpServiceFilter(Duration duration, boolean fullRequestResponse) * * @param duration the timeout {@link Duration} * @param fullRequestResponse if {@code true} then timeout is for full request/response transaction otherwise only - * the response metadata must complete before the timeout. + * the response metadata must arrive before the timeout * @param timeoutExecutor the {@link Executor} to use for managing the timer notifications */ public TimeoutHttpServiceFilter(Duration duration, boolean fullRequestResponse, Executor timeoutExecutor) { - this(simpleDurationTimeout(duration), fullRequestResponse, timeoutExecutor); + this(new FixedDuration(duration), fullRequestResponse, timeoutExecutor); } /** - * Construct a new instance. + * Creates a new instance. * * @param timeoutForRequest function for extracting timeout from request which may also determine the timeout using - * other sources. If no timeout is to be applied then the function should return null. + * other sources. If no timeout is to be applied then the function should return {@code null} * @param fullRequestResponse if {@code true} then timeout is for full request/response transaction otherwise only - * the response metadata must complete before the timeout. + * the response metadata must arrive before the timeout */ public TimeoutHttpServiceFilter(TimeoutFromRequest timeoutForRequest, boolean fullRequestResponse) { - this.timeoutForRequest = Objects.requireNonNull(timeoutForRequest, "timeoutForRequest"); - this.fullRequestResponse = fullRequestResponse; - this.timeoutExecutor = null; + super(timeoutForRequest, fullRequestResponse); } /** - * Construct a new instance. + * Creates a new instance. * * @param timeoutForRequest function for extracting timeout from request which may also determine the timeout using - * other sources. If no timeout is to be applied then the function should return null. + * other sources. If no timeout is to be applied then the function should return {@code null} * @param fullRequestResponse if {@code true} then timeout is for full request/response transaction otherwise only - * the response metadata must complete before the timeout. + * the response metadata must arrive before the timeout * @param timeoutExecutor the {@link Executor} to use for managing the timer notifications */ public TimeoutHttpServiceFilter(TimeoutFromRequest timeoutForRequest, boolean fullRequestResponse, Executor timeoutExecutor) { - this.timeoutForRequest = Objects.requireNonNull(timeoutForRequest, "timeoutForRequest"); - this.fullRequestResponse = fullRequestResponse; - this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor, "timeoutExecutor"); + super(timeoutForRequest, fullRequestResponse, timeoutExecutor); } @Override @@ -137,52 +116,9 @@ public StreamingHttpServiceFilter create(final StreamingHttpService service) { public Single handle(final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { - return TimeoutHttpServiceFilter.this.handle(delegate(), ctx, request, responseFactory); + return TimeoutHttpServiceFilter.this.withTimeout(request, + r -> delegate().handle(ctx, r, responseFactory)); } }; } - - @Override - public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) { - return timeoutForRequest.influenceStrategy(strategy); - } - - private Single handle(final StreamingHttpService delegate, - final HttpServiceContext ctx, - final StreamingHttpRequest request, - final StreamingHttpResponseFactory responseFactory) { - return Single.defer(() -> { - Duration timeout = timeoutForRequest.apply(request); - Single response; - if (null != timeout && Duration.ZERO.compareTo(timeout) >= 0) { - response = Single.failed(new TimeoutException("negative timeout of " + timeout.toMillis() + "ms")); - } else { - response = delegate.handle(ctx, request, responseFactory); - - if (null != timeout) { - Single timeoutResponse = timeoutExecutor == null ? - response.timeout(timeout) : response.timeout(timeout, timeoutExecutor); - - if (fullRequestResponse) { - long deadline = System.nanoTime() + timeout.toNanos(); - response = timeoutResponse.map(resp -> resp.transformMessageBody(body -> - Publisher.defer(() -> { - Duration remaining = Duration.ofNanos(deadline - System.nanoTime()); - return (Duration.ZERO.compareTo(remaining) <= 0 ? - Publisher.failed( - new TimeoutException("timeout after " + timeout.toMillis() + "ms")) - : (null == timeoutExecutor ? - body.timeoutTerminal(remaining) - : body.timeoutTerminal(remaining, timeoutExecutor)) - ).subscribeShareContext(); - }))); - } else { - response = timeoutResponse; - } - } - } - - return response.subscribeShareContext(); - }); - } } diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilterTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilterTest.java new file mode 100644 index 0000000000..fdfe9cb27f --- /dev/null +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/AbstractTimeoutHttpFilterTest.java @@ -0,0 +1,184 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.utils; + +import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.api.TestPublisher; +import io.servicetalk.concurrent.api.TestSingle; +import io.servicetalk.concurrent.api.test.StepVerifiers; +import io.servicetalk.http.api.DefaultHttpHeadersFactory; +import io.servicetalk.http.api.EmptyHttpHeaders; +import io.servicetalk.http.api.StreamingHttpResponse; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; +import static io.servicetalk.concurrent.api.Executors.immediate; +import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.concurrent.internal.TimeoutTracingInfoExtension.DEFAULT_TIMEOUT_SECONDS; +import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1; +import static io.servicetalk.http.api.HttpResponseStatus.OK; +import static io.servicetalk.http.api.StreamingHttpResponses.newResponse; +import static java.lang.Long.MAX_VALUE; +import static java.time.Duration.ZERO; +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofNanos; +import static java.time.Duration.ofSeconds; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; + +abstract class AbstractTimeoutHttpFilterTest { + + abstract void newFilter(Duration duration); + + abstract Single applyFilter(Duration duration, boolean fullRequestResponse, + Single responseSingle); + + abstract Single applyFilter(TimeoutFromRequest timeoutForRequest, + boolean fullRequestResponse, + Single responseSingle); + + @Test + public void constructorValidatesDuration() { + //noinspection ConstantConditions + assertThrows(NullPointerException.class, () -> newFilter(null)); + assertThrows(IllegalArgumentException.class, () -> newFilter(Duration.ZERO)); + assertThrows(IllegalArgumentException.class, () -> newFilter(ofNanos(1L).negated())); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void responseTimeout(boolean fullRequestResponse) { + TestSingle responseSingle = new TestSingle<>(); + StepVerifiers.create(applyFilter(ofNanos(1L), fullRequestResponse, responseSingle)) + .expectError(TimeoutException.class) + .verify(); + assertThat("No subscribe for response single", responseSingle.isSubscribed(), is(true)); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void responseWithZeroTimeout(boolean fullRequestResponse) { + responseWithNonPositiveTimeout(ZERO, fullRequestResponse); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void responseWithNegativeTimeout(boolean fullRequestResponse) { + responseWithNonPositiveTimeout(ofNanos(1L).negated(), fullRequestResponse); + } + + private void responseWithNonPositiveTimeout(Duration timeout, boolean fullRequestResponse) { + TestSingle responseSingle = new TestSingle<>(); + StepVerifiers.create(applyFilter(__ -> timeout, fullRequestResponse, responseSingle)) + .expectError(TimeoutException.class) + .verify(); + assertThat("Unexpected subscribe for response single", responseSingle.isSubscribed(), is(false)); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void responseCompletesBeforeTimeout(boolean fullRequestResponse) { + TestSingle responseSingle = new TestSingle<>(); + StepVerifiers.create(applyFilter(ofSeconds(DEFAULT_TIMEOUT_SECONDS / 2), fullRequestResponse, responseSingle)) + .then(() -> immediate().schedule(() -> responseSingle.onSuccess(mock(StreamingHttpResponse.class)), + ofMillis(50L))) + .expectSuccess() + .verify(); + assertThat("No subscribe for response single", responseSingle.isSubscribed(), is(true)); + } + + @Test + public void payloadBodyTimeout() { + TestPublisher payloadBody = new TestPublisher<>(); + AtomicBoolean responseSucceeded = new AtomicBoolean(); + StepVerifiers.create(applyFilter(ofMillis(100L), true, responseWith(payloadBody)) + .whenOnSuccess(__ -> responseSucceeded.set(true)) + .flatMapPublisher(StreamingHttpResponse::payloadBody)) + .thenRequest(MAX_VALUE) + .expectError(TimeoutException.class) + .verify(); + assertThat("Response did not succeeded", responseSucceeded.get(), is(true)); + assertThat("No subscribe for payload body", payloadBody.isSubscribed(), is(true)); + } + + @Test + public void payloadBodyDoesNotTimeoutWhenIgnored() { + Duration timeout = ofMillis(100L); + TestPublisher payloadBody = new TestPublisher<>(); + AtomicBoolean responseSucceeded = new AtomicBoolean(); + StepVerifiers.create(applyFilter(timeout, false, responseWith(payloadBody)) + .whenOnSuccess(__ -> responseSucceeded.set(true)) + .flatMapPublisher(StreamingHttpResponse::payloadBody)) + .expectSubscriptionConsumed(subscription -> + immediate().schedule(subscription::cancel, timeout.plusMillis(10L))) + .thenRequest(MAX_VALUE) + .expectNoSignals(timeout.plusMillis(5L)) + // FIXME: use thenCancel() instead of expectSubscriptionConsumed(...) + expectError() + // https://github.com/apple/servicetalk/issues/1492 + .expectError(IllegalStateException.class) // should never happen + .verify(); + assertThat("Response did not succeeded", responseSucceeded.get(), is(true)); + assertThat("No subscribe for payload body", payloadBody.isSubscribed(), is(true)); + } + + @Test + public void subscribeToPayloadBodyAfterTimeout() { + Duration timeout = ofMillis(100L); + TestPublisher payloadBody = new TestPublisher<>(); + AtomicReference response = new AtomicReference<>(); + StepVerifiers.create(applyFilter(timeout, true, responseWith(payloadBody))) + .expectSuccessConsumed(response::set) + .verify(); + + // Subscribe to payload body after timeout + StepVerifiers.create(immediate().timer(timeout.plusMillis(5L)).concat(response.get().payloadBody())) + .expectError(TimeoutException.class) + .verify(); + assertThat("Unexpected subscribe for payload body", payloadBody.isSubscribed(), is(false)); + } + + @Test + public void payloadBodyCompletesBeforeTimeout() { + TestPublisher payloadBody = new TestPublisher<>(); + AtomicReference response = new AtomicReference<>(); + StepVerifiers.create(applyFilter(ofSeconds(DEFAULT_TIMEOUT_SECONDS / 2), true, responseWith(payloadBody))) + .expectSuccessConsumed(response::set) + .verify(); + + StepVerifiers.create(response.get().payloadBody()) + .then(() -> immediate().schedule(payloadBody::onComplete, ofMillis(50L))) + .expectComplete() + .verify(); + assertThat("No subscribe for payload body", payloadBody.isSubscribed(), is(true)); + } + + private static Single responseWith(Publisher payloadBody) { + return succeeded(newResponse(OK, HTTP_1_1, EmptyHttpHeaders.INSTANCE, DEFAULT_ALLOCATOR, + DefaultHttpHeadersFactory.INSTANCE).payloadBody(payloadBody)); + } +} diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilterTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilterTest.java new file mode 100644 index 0000000000..eac268266a --- /dev/null +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpRequesterFilterTest.java @@ -0,0 +1,58 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.utils; + +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.http.api.FilterableStreamingHttpConnection; +import io.servicetalk.http.api.HttpExecutionStrategy; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpRequester; +import io.servicetalk.http.api.StreamingHttpResponse; + +import java.time.Duration; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TimeoutHttpRequesterFilterTest extends AbstractTimeoutHttpFilterTest { + + @Override + void newFilter(Duration duration) { + new TimeoutHttpRequesterFilter(duration); + } + + @Override + Single applyFilter(Duration duration, boolean fullRequestResponse, + Single responseSingle) { + return applyFilter(new TimeoutHttpRequesterFilter(duration, fullRequestResponse), responseSingle); + } + + @Override + Single applyFilter(TimeoutFromRequest timeoutForRequest, boolean fullRequestResponse, + Single responseSingle) { + return applyFilter(new TimeoutHttpRequesterFilter(timeoutForRequest, fullRequestResponse), responseSingle); + } + + private static Single applyFilter(TimeoutHttpRequesterFilter filterFactory, + Single responseSingle) { + FilterableStreamingHttpConnection connection = mock(FilterableStreamingHttpConnection.class); + when(connection.request(any(), any())).thenReturn(responseSingle); + + StreamingHttpRequester requester = filterFactory.create(connection); + return requester.request(mock(HttpExecutionStrategy.class), mock(StreamingHttpRequest.class)); + } +} diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpServiceFilterTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpServiceFilterTest.java new file mode 100644 index 0000000000..7a4c8c87ed --- /dev/null +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/TimeoutHttpServiceFilterTest.java @@ -0,0 +1,60 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.utils; + +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.http.api.HttpServiceContext; +import io.servicetalk.http.api.StreamingHttpRequest; +import io.servicetalk.http.api.StreamingHttpResponse; +import io.servicetalk.http.api.StreamingHttpResponseFactory; +import io.servicetalk.http.api.StreamingHttpService; +import io.servicetalk.http.api.StreamingHttpServiceFilter; + +import java.time.Duration; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TimeoutHttpServiceFilterTest extends AbstractTimeoutHttpFilterTest { + + @Override + void newFilter(Duration duration) { + new TimeoutHttpServiceFilter(duration); + } + + @Override + Single applyFilter(Duration duration, boolean fullRequestResponse, + Single responseSingle) { + return applyFilter(new TimeoutHttpServiceFilter(duration, fullRequestResponse), responseSingle); + } + + @Override + Single applyFilter(TimeoutFromRequest timeoutForRequest, boolean fullRequestResponse, + Single responseSingle) { + return applyFilter(new TimeoutHttpServiceFilter(timeoutForRequest, fullRequestResponse), responseSingle); + } + + private static Single applyFilter(TimeoutHttpServiceFilter filterFactory, + Single responseSingle) { + StreamingHttpService service = mock(StreamingHttpService.class); + when(service.handle(any(), any(), any())).thenReturn(responseSingle); + + StreamingHttpServiceFilter filter = filterFactory.create(service); + return filter.handle(mock(HttpServiceContext.class), mock(StreamingHttpRequest.class), + mock(StreamingHttpResponseFactory.class)); + } +} diff --git a/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/DurationUtils.java b/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/DurationUtils.java new file mode 100644 index 0000000000..0feff9152c --- /dev/null +++ b/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/DurationUtils.java @@ -0,0 +1,57 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.utils.internal; + +import java.time.Duration; + +import static java.time.Duration.ZERO; +import static java.util.Objects.requireNonNull; + +/** + * Helper utilities for {@link Duration}. + */ +public final class DurationUtils { + + private DurationUtils() { + // No instances + } + + /** + * Checks if the duration is positive, excluding zero. + * + * @param duration the {@link Duration} to validate + * @return {@code true} if the passed duration is greater than {@link Duration#ZERO}, {@code false} otherwise + */ + public static boolean isPositive(final Duration duration) { + return ZERO.compareTo(duration) < 0; + } + + /** + * Ensures the duration is positive, excluding zero. + * + * @param duration the {@link Duration} to validate + * @param name name of the {@link Duration} variable + * @return the passed duration if all checks pass + * @throws NullPointerException if the passed duration is {@code null} + * @throws IllegalArgumentException if the passed duration is not greater than {@link Duration#ZERO} + */ + public static Duration ensurePositive(final Duration duration, final String name) { + if (!isPositive(requireNonNull(duration, name))) { + throw new IllegalArgumentException(name + ": " + duration + " (expected > 0)"); + } + return duration; + } +} diff --git a/servicetalk-utils-internal/src/test/java/io/servicetalk/utils/internal/DurationUtilsTest.java b/servicetalk-utils-internal/src/test/java/io/servicetalk/utils/internal/DurationUtilsTest.java new file mode 100644 index 0000000000..2fb8648f21 --- /dev/null +++ b/servicetalk-utils-internal/src/test/java/io/servicetalk/utils/internal/DurationUtilsTest.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2021 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.utils.internal; + +import org.junit.Test; + +import java.time.Duration; + +import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; +import static io.servicetalk.utils.internal.DurationUtils.isPositive; +import static java.time.Duration.ofNanos; +import static java.time.Duration.ofSeconds; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class DurationUtilsTest { + + @Test + public void testIsPositiveSeconds() { + assertThat(isPositive(ofSeconds(1L)), is(true)); + assertThat(isPositive(ofSeconds(0L)), is(false)); + assertThat(isPositive(ofSeconds(1L).negated()), is(false)); + } + + @Test + public void testIsPositiveNanos() { + assertThat(isPositive(ofNanos(1L)), is(true)); + assertThat(isPositive(ofNanos(0L)), is(false)); + assertThat(isPositive(ofNanos(1L).negated()), is(false)); + } + + @Test + public void testEnsurePositive() { + assertThrows(NullPointerException.class, () -> ensurePositive(null, "duration")); + assertThrows(IllegalArgumentException.class, () -> ensurePositive(Duration.ZERO, "duration")); + assertThrows(IllegalArgumentException.class, () -> ensurePositive(ofNanos(1L).negated(), "duration")); + assertThrows(IllegalArgumentException.class, () -> ensurePositive(ofSeconds(1L).negated(), "duration")); + } +}