diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml index 687146b70..46852417b 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml @@ -18,3 +18,4 @@ spec: functions: statefun.smoke.e2e/command-interpreter-fn urlPathTemplate: https://remote-function-host maxNumBatchRequests: 10000 + maxRetries: 5 diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml index 6836ae58a..553c6fa92 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml +++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml @@ -18,9 +18,10 @@ spec: functions: statefun.smoke.e2e/command-interpreter-fn urlPathTemplate: https://remote-function-host:8000 maxNumBatchRequests: 10000 + maxRetries: 5 transport: type: io.statefun.transports.v1/async trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem client_cert: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt client_key: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8 - client_key_password: /opt/statefun/modules/statefun-smoke-e2e/certs/key_password.txt \ No newline at end of file + client_key_password: /opt/statefun/modules/statefun-smoke-e2e/certs/key_password.txt diff --git a/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml index ab747d865..bd31e89c6 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml +++ b/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml @@ -17,4 +17,5 @@ kind: io.statefun.endpoints.v2/http spec: functions: statefun.smoke.e2e/command-interpreter-fn urlPathTemplate: http://remote-function-host:8000 - maxNumBatchRequests: 10000 \ No newline at end of file + maxNumBatchRequests: 10000 + maxRetries: 5 diff --git a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/resources/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/resources/module.yaml index 01e2eca6a..84511f23f 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/resources/module.yaml +++ b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/resources/module.yaml @@ -17,4 +17,5 @@ kind: io.statefun.endpoints.v2/http spec: functions: statefun.smoke.e2e/command-interpreter-fn urlPathTemplate: http://localhost:8000 - maxNumBatchRequests: 10000 \ No newline at end of file + maxNumBatchRequests: 10000 + maxRetries: 5 diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java index f2cf7951a..431aeb113 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java @@ -56,7 +56,8 @@ final class DefaultHttpRequestReplyClient implements RequestReplyClient { public CompletableFuture call( ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, - ToFunction toFunction) { + ToFunction toFunction, + int maxRetries) { Request request = new Request.Builder() .url(url) @@ -65,13 +66,17 @@ public CompletableFuture call( Call newCall = client.newCall(request); RetryingCallback callback = - new RetryingCallback(requestSummary, metrics, newCall.timeout(), isShutdown); + new RetryingCallback(requestSummary, metrics, newCall.timeout(), isShutdown, maxRetries); callback.attachToCall(newCall); return callback.future().thenApply(DefaultHttpRequestReplyClient::parseResponse); } private static FromFunction parseResponse(Response response) { - final InputStream httpResponseBody = responseBody(response); + if (response == null) { + return null; + } + +final InputStream httpResponseBody = responseBody(response); try { return parseProtobufOrThrow(FromFunction.parser(), httpResponseBody); } finally { diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java index 587a6af94..cd57d9321 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java @@ -35,6 +35,9 @@ public final class HttpFunctionEndpointSpec implements Serializable { private static final long serialVersionUID = 1; private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; + + private static final Integer DEFAULT_MAX_RETRIES = -1; + private static final TransportClientSpec DEFAULT_TRANSPORT_CLIENT_SPEC = new TransportClientSpec( TransportClientConstants.ASYNC_CLIENT_FACTORY_TYPE, @@ -47,6 +50,7 @@ public final class HttpFunctionEndpointSpec implements Serializable { private final TargetFunctions targetFunctions; private final UrlPathTemplate urlPathTemplate; private final int maxNumBatchRequests; + private final int maxRetries; // ============================================================ // HTTP transport related properties @@ -63,11 +67,13 @@ private HttpFunctionEndpointSpec( TargetFunctions targetFunctions, UrlPathTemplate urlPathTemplate, int maxNumBatchRequests, + int maxRetries, TypeName transportClientFactoryType, ObjectNode transportClientProps) { this.targetFunctions = targetFunctions; this.urlPathTemplate = urlPathTemplate; this.maxNumBatchRequests = maxNumBatchRequests; + this.maxRetries = maxRetries; this.transportClientFactoryType = transportClientFactoryType; this.transportClientProps = transportClientProps; } @@ -84,6 +90,10 @@ public int maxNumBatchRequests() { return maxNumBatchRequests; } + public int maxRetries() { + return maxRetries; + } + public TypeName transportClientFactoryType() { return transportClientFactoryType; } @@ -99,6 +109,7 @@ public static final class Builder { private final UrlPathTemplate urlPathTemplate; private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS; + private int maxRetries = DEFAULT_MAX_RETRIES; private TransportClientSpec transportClientSpec = DEFAULT_TRANSPORT_CLIENT_SPEC; @JsonCreator @@ -118,6 +129,12 @@ public Builder withMaxNumBatchRequests(int maxNumBatchRequests) { return this; } + @JsonProperty("maxRetries") + public Builder withMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + /** * This is marked with @JsonProperty specifically to tell Jackson to use this method when * deserializing from Json. @@ -138,6 +155,7 @@ public HttpFunctionEndpointSpec build() { targetFunctions, urlPathTemplate, maxNumBatchRequests, + maxRetries, transportClientSpec.factoryKind(), transportClientSpec.specNode()); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java index 34a72fe49..27173c509 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java @@ -46,6 +46,7 @@ public StatefulFunction functionOfType(FunctionType functionType) { return new RequestReplyFunction( functionType, endpointSpec.maxNumBatchRequests(), + endpointSpec.maxRetries(), requestReplyClientFactory.createTransportClient( endpointSpec.transportClientProperties(), endpointUrl)); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java index ffb7f7bb7..1ff049c70 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java @@ -30,6 +30,7 @@ import okhttp3.Call; import okhttp3.Callback; import okhttp3.Response; +import okhttp3.ResponseBody; import okio.Timeout; import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics; @@ -42,8 +43,7 @@ final class RetryingCallback implements Callback { private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(10); - private static final Set RETRYABLE_HTTP_CODES = - new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500)); + private static final Set RETRYABLE_HTTP_CODES = new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500)); private static final Logger LOG = LoggerFactory.getLogger(RetryingCallback.class); @@ -52,19 +52,25 @@ final class RetryingCallback implements Callback { private final ToFunctionRequestSummary requestSummary; private final RemoteInvocationMetrics metrics; private final BooleanSupplier isShutdown; + private final int maxRetries; private long requestStarted; + private int retryAttempts; + RetryingCallback( ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, Timeout timeout, - BooleanSupplier isShutdown) { + BooleanSupplier isShutdown, + int maxRetries) { this.resultFuture = new CompletableFuture<>(); this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout)); this.requestSummary = requestSummary; this.metrics = metrics; this.isShutdown = Objects.requireNonNull(isShutdown); + this.maxRetries = maxRetries; + this.retryAttempts = 0; } CompletableFuture future() { @@ -105,6 +111,18 @@ private void onResponseUnsafe(Call call, Response response) { resultFuture.complete(response); return; } + + if ((maxRetries >= 0) && (response.code() == 500)) { + if (retryAttempts < maxRetries) { + LOG.warn("Failed attempt " + retryAttempts + " of " + maxRetries + ". Retrying."); + retryAttempts++; + } else { + LOG.warn("Maximum number of attempts (" + maxRetries + ") exceeded. Dropping message."); + resultFuture.complete(null); + return; + } + } + if (!RETRYABLE_HTTP_CODES.contains(response.code()) && response.code() < 500) { throw new IllegalStateException("Non successful HTTP response code " + response.code()); } @@ -130,7 +148,8 @@ private boolean retryAfterApplyingBackoff(Call call) { } /** - * Executes the runnable, and completes {@link #resultFuture} with any exceptions thrown, during + * Executes the runnable, and completes {@link #resultFuture} with any + * exceptions thrown, during * its execution. */ private void tryWithFuture(RunnableWithException runnable) { diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/binders/v1/HttpEndpointBinderV1.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/binders/v1/HttpEndpointBinderV1.java index 59b373e06..13bedc206 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/binders/v1/HttpEndpointBinderV1.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/binders/v1/HttpEndpointBinderV1.java @@ -48,6 +48,7 @@ * functions: com.foo.bar/* (typename) * urlPathTemplate: https://bar.foo.com:8080/{function.name} (string) * maxNumBatchRequests: 10000 (int, optional) + * maxRetries: 5 (int, optional) * timeouts: (object, optional) * call: 1minute (duration, optional) * connect: 20seconds (duration, optional) @@ -68,6 +69,7 @@ public final class HttpEndpointBinderV1 implements ComponentBinder { private static final JsonPointer URL_PATH_TEMPLATE = JsonPointer.compile("/urlPathTemplate"); private static final JsonPointer MAX_NUM_BATCH_REQUESTS = JsonPointer.compile("/maxNumBatchRequests"); + private static final JsonPointer MAX_RETRIES = JsonPointer.compile("/maxRetries"); private HttpEndpointBinderV1() {} @@ -105,6 +107,8 @@ private static HttpFunctionEndpointSpec parseSpec(ComponentJsonObject component) optionalMaxNumBatchRequests(httpEndpointSpecNode) .ifPresent(specBuilder::withMaxNumBatchRequests); + optionalMaxRetries(httpEndpointSpecNode).ifPresent(specBuilder::withMaxRetries); + final TransportClientSpec transportClientSpec = new TransportClientSpec( TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE, (ObjectNode) httpEndpointSpecNode); @@ -126,4 +130,8 @@ private static UrlPathTemplate urlPathTemplate(JsonNode functionEndpointSpecNode private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) { return Selectors.optionalIntegerAt(functionNode, MAX_NUM_BATCH_REQUESTS); } + + private static OptionalInt optionalMaxRetries(JsonNode functionNode) { + return Selectors.optionalIntegerAt(functionNode, MAX_RETRIES); + } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java index b25b45f17..3200582d3 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java @@ -127,7 +127,8 @@ private NettyClient( public CompletableFuture call( ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, - ToFunction toFunction) { + ToFunction toFunction, + int maxRetries) { NettyRequest request = new NettyRequest(this, metrics, requestSummary, toFunction); return request.start(); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java index cee0338bf..e6849b716 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java @@ -42,12 +42,13 @@ public ClassLoaderSafeRequestReplyClient(RequestReplyClient delegate) { public CompletableFuture call( ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, - ToFunction toFunction) { + ToFunction toFunction, + int maxRetries) { final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(delegateClassLoader); - return delegate.call(requestSummary, metrics, toFunction); + return delegate.call(requestSummary, metrics, toFunction, maxRetries); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java index 7a423fff8..fcdb35938 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java @@ -30,5 +30,6 @@ public interface RequestReplyClient { CompletableFuture call( ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, - ToFunction toFunction); + ToFunction toFunction, + int maxRetries); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java index 21ddd4f5f..756e1b58e 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java @@ -55,6 +55,7 @@ public final class RequestReplyFunction implements StatefulFunction { private final FunctionType functionType; private final RequestReplyClient client; private final int maxNumBatchRequests; + private final int maxRetries; /** * This flag indicates whether or not at least one request has already been sent to the remote @@ -93,8 +94,17 @@ public final class RequestReplyFunction implements StatefulFunction { @Persisted private final PersistedRemoteFunctionValues managedStates; public RequestReplyFunction( - FunctionType functionType, int maxNumBatchRequests, RequestReplyClient client) { - this(functionType, new PersistedRemoteFunctionValues(), maxNumBatchRequests, client, false); + FunctionType functionType, + int maxNumBatchRequests, + int maxRetries, + RequestReplyClient client) { + this( + functionType, + new PersistedRemoteFunctionValues(), + maxNumBatchRequests, + maxRetries, + client, + false); } @VisibleForTesting @@ -102,11 +112,13 @@ public RequestReplyFunction( FunctionType functionType, PersistedRemoteFunctionValues states, int maxNumBatchRequests, + int maxRetries, RequestReplyClient client, boolean isFirstRequestSent) { this.functionType = Objects.requireNonNull(functionType); this.managedStates = Objects.requireNonNull(states); this.maxNumBatchRequests = maxNumBatchRequests; + this.maxRetries = maxRetries; this.client = Objects.requireNonNull(client); this.isFirstRequestSent = isFirstRequestSent; } @@ -181,6 +193,9 @@ private void onAsyncResult( private static Either unpackResponse( FromFunction fromFunction) { + if (fromFunction == null) { + return Either.Left(InvocationResponse.getDefaultInstance()); + } if (fromFunction.hasIncompleteInvocationContext()) { return Either.Right(fromFunction.getIncompleteInvocationContext()); } @@ -339,7 +354,7 @@ private void sendToFunction(InternalContext context, ToFunction toFunction) { toFunction.getInvocation().getInvocationsCount()); RemoteInvocationMetrics metrics = context.functionTypeMetrics(); CompletableFuture responseFuture = - client.call(requestSummary, metrics, toFunction); + client.call(requestSummary, metrics, toFunction, maxRetries); if (isFirstRequestSent) { context.registerAsyncOperation(toFunction, responseFuture); diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java index 9d6440a52..e64dc1a3e 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java @@ -70,7 +70,12 @@ public class RequestReplyFunctionTest { private final RequestReplyFunction functionUnderTest = new RequestReplyFunction( - FN_TYPE, testInitialRegisteredState("session", "com.foo.bar/myType"), 10, client, true); + FN_TYPE, + testInitialRegisteredState("session", "com.foo.bar/myType"), + 10, + 5, + client, + true); @Test public void example() { @@ -121,7 +126,7 @@ public void batchIsAccumulatedWhileARequestIsInFlight() { @Test public void reachingABatchLimitTriggersBackpressure() { - RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, client); + RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, 5, client); // send one message functionUnderTest.invoke(context, TypedValue.getDefaultInstance()); @@ -137,7 +142,7 @@ public void reachingABatchLimitTriggersBackpressure() { @Test public void returnedMessageReleaseBackpressure() { - RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, client); + RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, 5, client); // the following invocations should cause backpressure functionUnderTest.invoke(context, TypedValue.getDefaultInstance()); @@ -342,7 +347,7 @@ public void retryBatchOnUnkownAsyncResponseAfterRestore() { ToFunction originalRequest = client.wasSentToFunction; RequestReplyFunction restoredFunction = - new RequestReplyFunction(FN_TYPE, new PersistedRemoteFunctionValues(), 2, client, true); + new RequestReplyFunction(FN_TYPE, new PersistedRemoteFunctionValues(), 2, 5, client, true); restoredFunction.invoke(context, unknownAsyncOperation(originalRequest)); // retry batch after a restore on an unknown async operation should start with empty state specs @@ -393,7 +398,8 @@ private static final class FakeClient implements RequestReplyClient { public CompletableFuture call( ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, - ToFunction toFunction) { + ToFunction toFunction, + int maxRetries) { this.wasSentToFunction = toFunction; try { return CompletableFuture.completedFuture(this.fromFunction.get()); diff --git a/statefun-flink/statefun-flink-core/src/test/resources/http-endpoint-binders/v1.yaml b/statefun-flink/statefun-flink-core/src/test/resources/http-endpoint-binders/v1.yaml index a888ac0ba..665ade34d 100644 --- a/statefun-flink/statefun-flink-core/src/test/resources/http-endpoint-binders/v1.yaml +++ b/statefun-flink/statefun-flink-core/src/test/resources/http-endpoint-binders/v1.yaml @@ -18,8 +18,9 @@ spec: functions: com.foo.bar/* urlPathTemplate: http://bar.foo.com:8080/functions/{function.name} maxNumBatchRequests: 10000 + maxRetries: 5 timeouts: call: 1minute connect: 30seconds read: 20seconds - write: 10seconds \ No newline at end of file + write: 10seconds diff --git a/statefun-flink/statefun-flink-core/src/test/resources/http-endpoint-binders/v2.yaml b/statefun-flink/statefun-flink-core/src/test/resources/http-endpoint-binders/v2.yaml index 54c2531f3..8bb2717fa 100644 --- a/statefun-flink/statefun-flink-core/src/test/resources/http-endpoint-binders/v2.yaml +++ b/statefun-flink/statefun-flink-core/src/test/resources/http-endpoint-binders/v2.yaml @@ -18,10 +18,11 @@ spec: functions: com.foo.bar/* urlPathTemplate: http://bar.foo.com:8080/functions/{function.name} maxNumBatchRequests: 10000 + maxRetries: 5 transport: type: io.statefun.transports.v1/okhttp timeouts: call: 1minute connect: 30seconds read: 20seconds - write: 10seconds \ No newline at end of file + write: 10seconds diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml index e39fd0773..e26c808f1 100644 --- a/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml +++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml @@ -32,6 +32,7 @@ module: read: 10second write: 10seconds maxNumBatchRequests: 10000 + maxRetries: 5 ingresses: - ingress: meta: @@ -56,4 +57,4 @@ module: type: exactly-once transactionTimeoutMillis: 100000 properties: - - foo.config: bar \ No newline at end of file + - foo.config: bar diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/AsyncRequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/AsyncRequestReplyFunctionBuilder.java index ae56d832e..b9d006480 100644 --- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/AsyncRequestReplyFunctionBuilder.java +++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/AsyncRequestReplyFunctionBuilder.java @@ -115,6 +115,17 @@ public AsyncRequestReplyFunctionBuilder withMaxNumBatchRequests(int maxNumBatchR return this; } + /** + * Sets the max retries number of attempts in order to deliver a message + * + * @param maxRetries the maximum number of attempts for delivering a message + * @return this builder. + */ + public AsyncRequestReplyFunctionBuilder withMaxRetries(int maxRetries) { + builder.withMaxRetries(maxRetries); + return this; + } + /** * Create the endpoint spec for the function. * diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java index 7468eea2e..87b5afa2f 100644 --- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java +++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java @@ -110,6 +110,17 @@ public RequestReplyFunctionBuilder withMaxNumBatchRequests(int maxNumBatchReques return this; } + /** + * Sets the max retries attempts in order to deliver a message + * + * @param maxRetries the maximum number of attempts for delivering a message + * @return this builder. + */ + public RequestReplyFunctionBuilder withMaxRetries(int maxRetries) { + builder.withMaxRetries(maxRetries); + return this; + } + /** * Create the endpoint spec for the function. * diff --git a/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/AsyncRequestReplyFunctionBuilderTest.java b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/AsyncRequestReplyFunctionBuilderTest.java index 0c52e1491..31682673b 100644 --- a/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/AsyncRequestReplyFunctionBuilderTest.java +++ b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/AsyncRequestReplyFunctionBuilderTest.java @@ -44,6 +44,7 @@ public void asyncClientSpecCanBeCreatedWithAllValues() final URI uri = new URI("foobar"); final int maxNumBatchRequests = 100; + final int maxRetries = 5; final Duration connectTimeout = Duration.ofSeconds(1); final Duration callTimeout = Duration.ofSeconds(2); final Duration pooledConnectionTTL = Duration.ofSeconds(3); @@ -53,6 +54,7 @@ public void asyncClientSpecCanBeCreatedWithAllValues() final AsyncRequestReplyFunctionBuilder builder = StatefulFunctionBuilder.asyncRequestReplyFunctionBuilder(functionType, uri) .withMaxNumBatchRequests(maxNumBatchRequests) + .withMaxRetries(maxRetries) .withMaxRequestDuration(callTimeout) .withConnectTimeout(connectTimeout) .withPooledConnectionTTL(pooledConnectionTTL) @@ -62,6 +64,7 @@ public void asyncClientSpecCanBeCreatedWithAllValues() HttpFunctionEndpointSpec spec = builder.spec(); assertThat(spec, notNullValue()); assertEquals(maxNumBatchRequests, spec.maxNumBatchRequests()); + assertEquals(maxRetries, spec.maxRetries()); assertEquals(functionType, spec.targetFunctions().asSpecificFunctionType()); assertEquals( spec.transportClientFactoryType(), TransportClientConstants.ASYNC_CLIENT_FACTORY_TYPE); @@ -91,6 +94,7 @@ public void asyncClientSpecCanBeCreatedWithSomeValues() final URI uri = new URI("foobar"); final int maxNumBatchRequests = 100; + final int maxRetries = 5; final Duration callTimeout = Duration.ofSeconds(2); final Duration pooledConnectionTTL = Duration.ofSeconds(3); final int maxRequestOrResponseSizeInBytes = 10000; @@ -98,6 +102,7 @@ public void asyncClientSpecCanBeCreatedWithSomeValues() final AsyncRequestReplyFunctionBuilder builder = StatefulFunctionBuilder.asyncRequestReplyFunctionBuilder(functionType, uri) .withMaxNumBatchRequests(maxNumBatchRequests) + .withMaxRetries(maxRetries) .withMaxRequestDuration(callTimeout) .withPooledConnectionTTL(pooledConnectionTTL) .withMaxRequestOrResponseSizeInBytes(maxRequestOrResponseSizeInBytes); @@ -105,6 +110,7 @@ public void asyncClientSpecCanBeCreatedWithSomeValues() HttpFunctionEndpointSpec spec = builder.spec(); assertThat(spec, notNullValue()); assertEquals(maxNumBatchRequests, spec.maxNumBatchRequests()); + assertEquals(maxRetries, spec.maxRetries()); assertEquals(functionType, spec.targetFunctions().asSpecificFunctionType()); assertEquals( spec.transportClientFactoryType(), TransportClientConstants.ASYNC_CLIENT_FACTORY_TYPE); diff --git a/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java index 47548d855..3fe7d1b2e 100644 --- a/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java +++ b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java @@ -43,6 +43,7 @@ public void clientSpecCanBeCreatedWithAllValues() final URI uri = new URI("foobar"); final int maxNumBatchRequests = 100; + final int maxRetries = 5; final Duration connectTimeout = Duration.ofSeconds(16); final Duration callTimeout = Duration.ofSeconds(21); final Duration readTimeout = Duration.ofSeconds(11); @@ -51,6 +52,7 @@ public void clientSpecCanBeCreatedWithAllValues() final RequestReplyFunctionBuilder builder = StatefulFunctionBuilder.requestReplyFunctionBuilder(functionType, uri) .withMaxNumBatchRequests(maxNumBatchRequests) + .withMaxRetries(maxRetries) .withMaxRequestDuration(callTimeout) .withConnectTimeout(connectTimeout) .withReadTimeout(readTimeout) @@ -59,6 +61,7 @@ public void clientSpecCanBeCreatedWithAllValues() HttpFunctionEndpointSpec spec = builder.spec(); assertThat(spec, notNullValue()); assertEquals(maxNumBatchRequests, spec.maxNumBatchRequests()); + assertEquals(maxRetries, spec.maxRetries()); assertEquals(functionType, spec.targetFunctions().asSpecificFunctionType()); assertEquals( spec.transportClientFactoryType(), TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE); @@ -87,18 +90,21 @@ public void clientSpecCanBeCreatedWithSomeValues() final URI uri = new URI("foobar"); final int maxNumBatchRequests = 100; + final int maxRetries = 5; final Duration connectTimeout = Duration.ofSeconds(16); final Duration callTimeout = Duration.ofSeconds(21); final RequestReplyFunctionBuilder builder = StatefulFunctionBuilder.requestReplyFunctionBuilder(functionType, uri) .withMaxNumBatchRequests(maxNumBatchRequests) + .withMaxRetries(maxRetries) .withMaxRequestDuration(callTimeout) .withConnectTimeout(connectTimeout); HttpFunctionEndpointSpec spec = builder.spec(); assertThat(spec, notNullValue()); assertEquals(maxNumBatchRequests, spec.maxNumBatchRequests()); + assertEquals(maxRetries, spec.maxRetries()); assertEquals(functionType, spec.targetFunctions().asSpecificFunctionType()); assertEquals( spec.transportClientFactoryType(), TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE);