Skip to content

Commit

Permalink
Fix duration comparison bug in timeout filters and reduce code duplic…
Browse files Browse the repository at this point in the history
…ation (#1491)

Motivation:

`Duration#compareTo` is hard to understand, the result of comparison
depends on the order of arguments. This lead to a bug in
`TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter` which always
fails payload body publisher with `TimeoutException`. Use a shared
utility to make `isPositive` check for `Duration` consistent across all modules.
Also, `TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter` logic
is identical, we can reduce duplication and share the logic.

Modifications:

- Introduce a shared `DurationUtils` class for common operations with
`Duration`;
- Use this utility to reduce mistakes when we check that the passed
`Duration` must be positive;
- Introduce `AbstractTimeoutHttpFilterTest` to share timeout logic
between `TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter`;
- Add tests to verify behavior correctness for both timeout filters;
- Adjust how the timeout filters are created in `ResponseTimeoutTest`
to avoid NPE;
- Minor improvements to make javadoc consistent between
`TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter`;

Result:

1. `TimeoutHttpRequesterFilter` and `TimeoutHttpServiceFilter` do not
fail payload body publisher before the actual timeout expires.
2. No code duplication between timeout filters.
3. Shared utility to consistently validate `Duration` in all modules.
  • Loading branch information
idelpivnitskiy authored Apr 15, 2021
1 parent 6735dea commit 5b0100b
Show file tree
Hide file tree
Showing 15 changed files with 608 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.encoding.api.Identity.identity;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;

/**
* Default implementation for {@link DefaultGrpcClientMetadata}.
Expand Down Expand Up @@ -149,8 +150,8 @@ protected DefaultGrpcClientMetadata(final String path,
super(path);
this.strategy = strategy;
this.requestEncoding = Objects.requireNonNull(requestEncoding, "requestEncoding");
if (null != timeout && Duration.ZERO.compareTo(timeout) >= 0) {
throw new IllegalArgumentException("timeout: " + timeout + " (expected > 0)");
if (null != timeout) {
ensurePositive(timeout, "timeout");
}
this.timeout = null != timeout && timeout.compareTo(GRPC_MAX_TIMEOUT) <= 0 ? timeout : null;
}
Expand Down
1 change: 1 addition & 0 deletions servicetalk-grpc-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(":servicetalk-http-utils")
implementation project(":servicetalk-annotations")
implementation project(":servicetalk-transport-netty-internal")
implementation project(":servicetalk-utils-internal")
implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "com.google.code.findbugs:jsr305:$jsr305Version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.net.SocketOption;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand All @@ -53,6 +52,7 @@
import static io.servicetalk.buffer.api.CharSequences.newAsciiString;
import static io.servicetalk.grpc.api.GrpcClientMetadata.GRPC_MAX_TIMEOUT;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;

final class DefaultGrpcClientBuilder<U, R> extends GrpcClientBuilder<U, R> {
/**
Expand Down Expand Up @@ -98,12 +98,7 @@ public GrpcClientBuilder<U, R> defaultTimeout(Duration defaultTimeout) {
if (invokedBuild) {
throw new IllegalStateException("default timeout cannot be modified after build, create a new builder");
}

if (Duration.ZERO.compareTo(Objects.requireNonNull(defaultTimeout, "defaultTimeout")) >= 0) {
throw new IllegalArgumentException("defaultTimeout: " + defaultTimeout + " (expected > 0)");
}

this.defaultTimeout = defaultTimeout;
this.defaultTimeout = ensurePositive(defaultTimeout, "defaultTimeout");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@
import java.net.SocketOption;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.grpc.api.GrpcClientMetadata.GRPC_MAX_TIMEOUT;
import static io.servicetalk.grpc.api.GrpcExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;

final class DefaultGrpcServerBuilder extends GrpcServerBuilder implements ServerBinder {

Expand Down Expand Up @@ -85,12 +85,7 @@ public GrpcServerBuilder defaultTimeout(Duration defaultTimeout) {
if (invokedBuild) {
throw new IllegalStateException("default timeout cannot be modified after build, create a new builder");
}

if (Duration.ZERO.compareTo(Objects.requireNonNull(defaultTimeout, "defaultTimeout")) >= 0) {
throw new IllegalArgumentException("defaultTimeout: " + defaultTimeout + " (expected > 0)");
}

this.defaultTimeout = defaultTimeout;
this.defaultTimeout = ensurePositive(defaultTimeout, "defaultTimeout");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ private void requestCancellationResetsStreamButNotParentConnection(StreamingHttp
// ignore
}
})
.expectError()
// .thenCancel()
// FIXME: use thenCancel() after await() instead of cancelling from inside then(...) + expectError()
// https://github.com/apple/servicetalk/issues/1492
.expectError(IllegalStateException.class) // should never happen
.verify();

assertThat("Unexpected responses", responses, is(empty()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource.Processor;
import io.servicetalk.concurrent.SingleSource.Subscriber;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout;
Expand Down Expand Up @@ -59,7 +58,6 @@
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
import static io.servicetalk.concurrent.api.Processors.newSingleProcessor;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
Expand All @@ -84,7 +82,7 @@ public class ResponseTimeoutTest {

@Rule
public final Timeout timeout = new ServiceTalkTestTimeout();
private final BlockingQueue<Processor<HttpResponse, HttpResponse>> serverResponses = new LinkedBlockingQueue<>();
private final BlockingQueue<Single<HttpResponse>> serverResponses = new LinkedBlockingQueue<>();
private final BlockingQueue<Cancellable> delayedClientCancels = new LinkedBlockingQueue<>();
private final BlockingQueue<ClientTerminationSignal> delayedClientTermination = new LinkedBlockingQueue<>();
private final ServerContext ctx;
Expand All @@ -96,11 +94,11 @@ public ResponseTimeoutTest(Duration clientTimeout,
Duration serverTimeout,
Class<? extends Throwable> expectThrowableClazz) throws Exception {
ctx = forAddress(localAddress(0))
.appendServiceFilter(new TimeoutHttpServiceFilter(serverTimeout, true))
.appendServiceFilter(new TimeoutHttpServiceFilter(__ -> serverTimeout, true))
.listenAndAwait((__, ___, factory) -> {
Processor<HttpResponse, HttpResponse> resp = newSingleProcessor();
Single<HttpResponse> resp = Single.never();
serverResponses.add(resp);
return Single.never();
return resp;
});
client = forSingleAddress(serverHostAndPort(ctx))
.appendClientFilter(client -> new StreamingHttpClientFilter(client) {
Expand All @@ -109,7 +107,7 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
final HttpExecutionStrategy strategy,
final StreamingHttpRequest request) {
return Single.succeeded(null)
.afterOnSubscribe(cancellable -> delayedClientCancels.add(cancellable))
.afterOnSubscribe(delayedClientCancels::add)
.concat(delegate().request(strategy, request)
.liftSync(target -> new Subscriber<StreamingHttpResponse>() {
@Override
Expand Down Expand Up @@ -141,7 +139,7 @@ public void onError(final Throwable t) {
}
})
.appendConnectionFactoryFilter(original -> new CountingConnectionFactory(original, connectionCount))
.appendClientFilter(new TimeoutHttpRequesterFilter(clientTimeout, true))
.appendClientFilter(new TimeoutHttpRequesterFilter(__ -> clientTimeout, true))
.build();
this.expectThrowableClazz = expectThrowableClazz;
}
Expand Down
4 changes: 4 additions & 0 deletions servicetalk-http-utils/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ dependencies {
implementation project(":servicetalk-annotations")
implementation project(":servicetalk-concurrent-api-internal")
implementation project(":servicetalk-concurrent-internal")
implementation project(":servicetalk-utils-internal")
implementation "com.google.code.findbugs:jsr305:$jsr305Version"
implementation "org.slf4j:slf4j-api:$slf4jVersion"

testImplementation testFixtures(project(":servicetalk-concurrent-api"))
testImplementation testFixtures(project(":servicetalk-concurrent-internal"))
testImplementation testFixtures(project(":servicetalk-http-api"))
testImplementation project(":servicetalk-buffer-netty")
testImplementation project(":servicetalk-concurrent-api-test")
testImplementation project(":servicetalk-test-resources")
testImplementation "junit:junit:$junitVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api:$junit5Version"
testImplementation "org.junit.jupiter:junit-jupiter-params:$junit5Version"
testImplementation "org.hamcrest:hamcrest-library:$hamcrestVersion"
testImplementation "org.mockito:mockito-core:$mockitoCoreVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.utils;

import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Publisher.defer;
import static io.servicetalk.concurrent.api.Publisher.failed;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;
import static io.servicetalk.utils.internal.DurationUtils.isPositive;
import static java.time.Duration.ofNanos;
import static java.util.Objects.requireNonNull;

abstract class AbstractTimeoutHttpFilter implements HttpExecutionStrategyInfluencer {
/**
* Establishes the timeout for a given request.
*/
private final TimeoutFromRequest timeoutForRequest;

/**
* If {@code true} then timeout is for full request/response transaction otherwise only the response metadata must
* complete before the timeout.
*/
private final boolean fullRequestResponse;

@Nullable
private final Executor timeoutExecutor;

AbstractTimeoutHttpFilter(final TimeoutFromRequest timeoutForRequest, final boolean fullRequestResponse) {
this.timeoutForRequest = requireNonNull(timeoutForRequest, "timeoutForRequest");
this.fullRequestResponse = fullRequestResponse;
this.timeoutExecutor = null;
}

AbstractTimeoutHttpFilter(final TimeoutFromRequest timeoutForRequest, final boolean fullRequestResponse,
final Executor timeoutExecutor) {
this.timeoutForRequest = requireNonNull(timeoutForRequest, "timeoutForRequest");
this.fullRequestResponse = fullRequestResponse;
this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor");
}

@Override
public final HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) {
return timeoutForRequest.influenceStrategy(strategy);
}

final Single<StreamingHttpResponse> withTimeout(final StreamingHttpRequest request,
final Function<StreamingHttpRequest, Single<StreamingHttpResponse>> responseFunction) {

return Single.defer(() -> {
final Duration timeout = timeoutForRequest.apply(request);
if (null != timeout && !isPositive(timeout)) {
return Single.failed(new TimeoutException("non-positive timeout of " + timeout.toMillis() + "ms"));
}

Single<StreamingHttpResponse> response = responseFunction.apply(request);
if (null != timeout) {
final Single<StreamingHttpResponse> timeoutResponse = timeoutExecutor == null ?
response.timeout(timeout) : response.timeout(timeout, timeoutExecutor);

if (fullRequestResponse) {
final long deadline = System.nanoTime() + timeout.toNanos();
response = timeoutResponse.map(resp -> resp.transformMessageBody(body -> defer(() -> {
final Duration remaining = ofNanos(deadline - System.nanoTime());
if (isPositive(remaining)) {
return (timeoutExecutor == null ?
body.timeoutTerminal(remaining) : body.timeoutTerminal(remaining, timeoutExecutor))
.subscribeShareContext();
}
return failed(new TimeoutException("timeout after " + timeout.toMillis() + "ms"));
})));
} else {
response = timeoutResponse;
}
}

return response.subscribeShareContext();
});
}

/**
* {@link TimeoutFromRequest} implementation which returns the provided default duration as the timeout duration to
* be used for any request.
*/
static final class FixedDuration implements TimeoutFromRequest {

private final Duration duration;

FixedDuration(final Duration duration) {
this.duration = ensurePositive(duration, "duration");
}

@Override
public Duration apply(final HttpRequestMetaData request) {
return duration;
}

@Override
public HttpExecutionStrategy influenceStrategy(final HttpExecutionStrategy strategy) {
// No influence since we do not block.
return strategy;
}
}
}
Loading

0 comments on commit 5b0100b

Please sign in to comment.