diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index d5431fae35..3a82dcb06f 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -46,6 +46,10 @@ import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.ExecutionStrategyInfluencer; import io.servicetalk.transport.api.RetryableException; +import io.servicetalk.utils.internal.ThrowableUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; @@ -89,17 +93,21 @@ */ public final class RetryingHttpRequesterFilter implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RetryingHttpRequesterFilter.class); + static final int DEFAULT_MAX_TOTAL_RETRIES = 4; private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES = - new RetryingHttpRequesterFilter(true, false, false, 1, null, + new RetryingHttpRequesterFilter(true, false, false, false, 1, null, (__, ___) -> NO_RETRIES, null); private static final RetryingHttpRequesterFilter DISABLE_ALL_RETRIES = - new RetryingHttpRequesterFilter(false, true, false, 0, null, + new RetryingHttpRequesterFilter(false, true, false, false, 0, null, (__, ___) -> NO_RETRIES, null); private final boolean waitForLb; private final boolean ignoreSdErrors; private final boolean mayReplayRequestPayload; + private final boolean returnOriginalResponses; private final int maxTotalRetries; @Nullable private final Function responseMapper; @@ -109,13 +117,14 @@ public final class RetryingHttpRequesterFilter RetryingHttpRequesterFilter( final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload, - final int maxTotalRetries, + final boolean returnOriginalResponses, final int maxTotalRetries, @Nullable final Function responseMapper, final BiFunction retryFor, @Nullable final RetryCallbacks onRequestRetry) { this.waitForLb = waitForLb; this.ignoreSdErrors = ignoreSdErrors; this.mayReplayRequestPayload = mayReplayRequestPayload; + this.returnOriginalResponses = returnOriginalResponses; this.maxTotalRetries = maxTotalRetries; this.responseMapper = responseMapper; this.retryFor = retryFor; @@ -194,24 +203,53 @@ public Completable apply(final int count, final Throwable t) { sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t); } - final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); - if (backOffPolicy != NO_RETRIES) { - final int offsetCount = count - lbNotReadyCount; - Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); - if (t instanceof DelayedRetryException) { - final Duration constant = ((DelayedRetryException) t).delay(); - retryWhen = retryWhen.concat(executor.timer(constant)); - } + try { + BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); + if (backOffPolicy != NO_RETRIES) { + final int offsetCount = count - lbNotReadyCount; + Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); + if (t instanceof DelayedRetryException) { + final Duration constant = ((DelayedRetryException) t).delay(); + retryWhen = retryWhen.concat(executor.timer(constant)); + } - return applyRetryCallbacks(retryWhen, count, t); + return applyRetryCallbacks(retryWhen, count, t); + } + } catch (Throwable tt) { + LOGGER.error("Unexpected exception when computing and applying backoff policy for {}({}). " + + "User-defined functions should not throw.", + RetryingHttpRequesterFilter.class.getName(), t.getMessage(), tt); + Completable result = failed(ThrowableUtils.addSuppressed(tt, t)); + if (returnOriginalResponses) { + StreamingHttpResponse response = extractStreamingResponse(t); + if (response != null) { + result = drain(response).concat(result); + } + } + return result; } return failed(t); } Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { - return retryCallbacks == null ? completable : - completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)); + Completable result = (retryCallbacks == null ? completable : + completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t))); + if (returnOriginalResponses) { + final StreamingHttpResponse response = extractStreamingResponse(t); + if (response != null) { + // If we succeed, we need to drain the response body before we continue. The retry completable + // fails we want to surface the original exception and don't worry about draining since the + // response will be returned to the user. + result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) + // If we get cancelled we also need to drain the message body as there is no guarantee + // we'll ever receive a completion event, error or success. This is okay to do since + // the subscriber has signaled they're no longer interested in the response. + .beforeCancel(() -> drain(response).subscribe()) + .concat(drain(response)); + } + } + return result; } } @@ -257,20 +295,39 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { - final HttpResponseException exception = responseMapper.apply(resp); - return (exception != null ? - // Drain response payload body before discarding it: - resp.payloadBody().ignoreElements().onErrorComplete() - .concat(Single.failed(exception)) : - Single.succeeded(resp)) - .shareContextOnSubscribe(); + final HttpResponseException exception; + try { + exception = responseMapper.apply(resp); + } catch (Throwable t) { + LOGGER.error("Unexpected exception when mapping response ({}) to an exception. User-defined " + + "functions should not throw.", resp.status(), t); + return drain(resp).concat(Single.failed(t)); + } + Single response; + if (exception == null) { + response = Single.succeeded(resp); + } else { + response = Single.failed(exception); + if (!returnOriginalResponses) { + response = drain(resp).concat(response); + } + } + return response.shareContextOnSubscribe(); }); } // 1. Metadata is shared across retries // 2. Publisher state is restored to original state for each retry // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). - return single.retryWhen(retryStrategy(request, executionContext(), true)); + single = single.retryWhen(retryStrategy(request, executionContext(), true)); + if (returnOriginalResponses) { + single = single.onErrorResume(HttpResponseException.class, t -> { + HttpResponseMetaData metaData = t.metaData(); + return (metaData instanceof StreamingHttpResponse ? + Single.succeeded((StreamingHttpResponse) metaData) : Single.failed(t)); + }); + } + return single; } } @@ -719,6 +776,7 @@ public static final class Builder { private int maxTotalRetries = DEFAULT_MAX_TOTAL_RETRIES; private boolean retryExpectationFailed; + private boolean returnOriginalResponses; private BiFunction retryRetryableExceptions = (requestMetaData, e) -> BackOffPolicy.ofImmediateBounded(); @@ -796,8 +854,13 @@ public Builder maxTotalRetries(final int maxRetries) { * retry behaviour through {@link #retryResponses(BiFunction)}. * * @param mapper a {@link Function} that maps a {@link HttpResponseMetaData} to an - * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. The - * mapper should return {@code null} if no retry is needed or if it cannot be determined that a retry is needed. + * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. + * In the case that the request cannot be retried, the {@link HttpResponseException} will be returned via the + * error pathway. + *

+ * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link Function} doesn't throw exceptions. + * * @return {@code this} */ public Builder responseMapper(final Function mapper) { @@ -810,7 +873,8 @@ public Builder responseMapper(final Function * To disable retries you can return {@link BackOffPolicy#ofNoRetries()} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link RetryableException} to a {@link BackOffPolicy}. @@ -833,7 +897,8 @@ public Builder retryRetryableExceptions( *

* To disable retries you can return {@link BackOffPolicy#ofNoRetries()} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link IOException} to a {@link BackOffPolicy}. @@ -871,7 +936,8 @@ public Builder retryExpectationFailed(boolean retryExpectationFailed) { * To disable retries and proceed evaluating other retry functions you can return, * {@link BackOffPolicy#ofNoRetries()} from the passed {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetryException delayed-exception} to a {@link BackOffPolicy}. @@ -893,7 +959,8 @@ public Builder retryDelayedRetryExceptions( *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. @@ -914,7 +981,8 @@ public Builder retryDelayedRetries(// FIXME: 0.43 - remove deprecated method *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. @@ -922,7 +990,30 @@ public Builder retryDelayedRetries(// FIXME: 0.43 - remove deprecated method */ public Builder retryResponses( final BiFunction mapper) { + return retryResponses(mapper, false); + } + + /** + * The retrying-filter will evaluate {@link HttpResponseException} that resulted from the + * {@link #responseMapper(Function)}, and support different retry behaviour according to the + * {@link HttpRequestMetaData request} and the {@link HttpResponseMetaData response}. + *

+ * To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. + *

+ * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. + * + * @param mapper The mapper to map the {@link HttpRequestMetaData} and the + * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. + * @param returnOriginalResponses whether to unwrap the response defined by the {@link HttpResponseException} + * meta-data in the case that the request is not retried. + * @return {@code this}. + */ + public Builder retryResponses( + final BiFunction mapper, + final boolean returnOriginalResponses) { this.retryResponses = requireNonNull(mapper); + this.returnOriginalResponses = returnOriginalResponses; return this; } @@ -932,7 +1023,8 @@ public Builder retryResponses( *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper {@link BiFunction} that checks whether a given combination of * {@link HttpRequestMetaData meta-data} and {@link Throwable cause} should be retried, producing a @@ -1041,7 +1133,7 @@ public RetryingHttpRequesterFilter build() { if (retryResponses != null && throwable instanceof HttpResponseException) { final BackOffPolicy backOffPolicy = - retryResponses.apply(requestMetaData, (HttpResponseException) throwable); + retryResponses.apply(requestMetaData, (HttpResponseException) throwable); if (backOffPolicy != NO_RETRIES) { return backOffPolicy; } @@ -1054,7 +1146,26 @@ public RetryingHttpRequesterFilter build() { return NO_RETRIES; }; return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload, - maxTotalRetries, responseMapper, allPredicate, onRequestRetry); + returnOriginalResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); + } + } + + private static Completable drain(StreamingHttpResponse response) { + return response.payloadBody().ignoreElements().onErrorComplete(); + } + + @Nullable + private static StreamingHttpResponse extractStreamingResponse(Throwable t) { + if (t instanceof HttpResponseException) { + HttpResponseException responseException = (HttpResponseException) t; + if (responseException.metaData() instanceof StreamingHttpResponse) { + return (StreamingHttpResponse) responseException.metaData(); + } else { + LOGGER.warn("Couldn't unpack response due to unexpected dynamic types. Required " + + "meta-data of type StreamingHttpResponse, found {}", + responseException.metaData().getClass()); + } } + return null; } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index d7c5136bd3..4e270d0391 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -62,12 +62,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Stream; import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.Single.defer; @@ -104,6 +108,7 @@ class RetryingHttpRequesterFilterTest { private static final String RETRYABLE_HEADER = "RETRYABLE"; + private static final String RESPONSE_BODY = "ok"; private final ServerContext svcCtx; private final SingleAddressHttpClientBuilder normalClientBuilder; @@ -119,7 +124,8 @@ class RetryingHttpRequesterFilterTest { RetryingHttpRequesterFilterTest() throws Exception { svcCtx = forAddress(localAddress(0)) .listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok() - .addHeader(RETRYABLE_HEADER, "yes")); + .addHeader(RETRYABLE_HEADER, "yes") + .payloadBody(ctx.executionContext().bufferAllocator().fromAscii(RESPONSE_BODY))); failingConnClientBuilder = forSingleAddress(serverHostAndPort(svcCtx)) .loadBalancerFactory(new DefaultHttpLoadBalancerFactory<>(new InspectingLoadBalancerFactory<>())) .appendConnectionFactoryFilter(ClosingConnectionFactory::new); @@ -251,21 +257,24 @@ private void assertRequestRetryingPred(final BlockingHttpClient client) { assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0)); } - @Test - void testResponseMapper() { + @ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}") + @ValueSource(booleans = {true, false}) + void testResponseMapper(final boolean returnOriginalResponses) throws Exception { AtomicInteger newConnectionCreated = new AtomicInteger(); AtomicInteger responseDrained = new AtomicInteger(); AtomicInteger onRequestRetryCounter = new AtomicInteger(); final int maxTotalRetries = 4; + final String retryMessage = "Retryable header"; normalClient = normalClientBuilder .appendClientFilter(new Builder() .maxTotalRetries(maxTotalRetries) .responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ? - new HttpResponseException("Retryable header", metaData) : null) + new HttpResponseException(retryMessage, metaData) : null) // Disable request retrying .retryRetryableExceptions((requestMetaData, e) -> ofNoRetries()) // Retry only responses marked so - .retryResponses((requestMetaData, throwable) -> ofImmediate(maxTotalRetries - 1)) + .retryResponses((requestMetaData, throwable) -> ofImmediate(maxTotalRetries - 1), + returnOriginalResponses) .onRequestRetry((count, req, t) -> assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) .build()) @@ -281,9 +290,15 @@ public Single request(final StreamingHttpRequest request) }; }) .buildBlocking(); - HttpResponseException e = assertThrows(HttpResponseException.class, - () -> normalClient.request(normalClient.get("/"))); - assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class)); + if (returnOriginalResponses) { + HttpResponse response = normalClient.request(normalClient.get("/")); + assertThat(response.status(), is(HttpResponseStatus.OK)); + assertThat(response.payloadBody().toString(StandardCharsets.US_ASCII), equalTo(RESPONSE_BODY)); + } else { + HttpResponseException e = assertThrows(HttpResponseException.class, + () -> normalClient.request(normalClient.get("/"))); + assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class)); + } // The load balancer is allowed to be not ready one time, which is counted against total retry attempts but not // against actual requests being issued. assertThat("Unexpected calls to select.", lbSelectInvoked.get(), allOf(greaterThanOrEqualTo(maxTotalRetries), @@ -295,6 +310,66 @@ public Single request(final StreamingHttpRequest request) assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); } + private enum ExceptionSource { + RESPONSE_MAPPER, + RETRY_RESPONSES + } + + private static Stream lambdaExceptions() { + return Stream.of(true, false).flatMap(returnOriginalResponses -> + Stream.of(ExceptionSource.values()) + .map(lambda -> Arguments.of(returnOriginalResponses, lambda))); + } + + @ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}, thrower={1}") + @MethodSource("lambdaExceptions") + void lambdaExceptions(final boolean returnOriginalResponses, final ExceptionSource thrower) { + final AtomicInteger newConnectionCreated = new AtomicInteger(); + final AtomicInteger requestsInitiated = new AtomicInteger(); + final AtomicInteger responseDrained = new AtomicInteger(); + final AtomicInteger onRequestRetryCounter = new AtomicInteger(); + final String retryMessage = "Retryable header"; + normalClient = normalClientBuilder + .appendClientFilter(new Builder() + .maxTotalRetries(4) + .responseMapper(metaData -> { + if (thrower == ExceptionSource.RESPONSE_MAPPER) { + throw new RuntimeException("responseMapper"); + } + return metaData.headers().contains(RETRYABLE_HEADER) ? + new HttpResponseException(retryMessage, metaData) : null; + }) + // Retry only responses marked so + .retryResponses((requestMetaData, throwable) -> { + if (thrower == ExceptionSource.RETRY_RESPONSES) { + throw new RuntimeException("retryResponses"); + } + return ofImmediate(3); + }, returnOriginalResponses) + .onRequestRetry((count, req, t) -> + assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) + .build()) + .appendConnectionFilter(c -> { + newConnectionCreated.incrementAndGet(); + return new StreamingHttpConnectionFilter(c) { + @Override + public Single request(final StreamingHttpRequest request) { + return Single.defer(() -> { + requestsInitiated.incrementAndGet(); + return delegate().request(request) + .map(response -> response.transformPayloadBody(payload -> payload + .whenFinally(responseDrained::incrementAndGet))); + }); + } + }; + }) + .buildBlocking(); + assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/"))); + assertThat("Response payload body was not drained on every mapping", responseDrained.get(), is(1)); + assertThat("Multiple requests initiated", requestsInitiated.get(), is(1)); + assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); + } + @Test void singleInstanceFilter() { Assertions.assertThrows(IllegalStateException.class, () -> forResolvedAddress(localAddress(8888))