Skip to content

Commit

Permalink
adding handling for 429 and Retry-After
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed May 22, 2023
1 parent c984cff commit 125e838
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public interface HttpHeaders {

Expand All @@ -36,4 +37,20 @@ public interface HttpHeaders {
*/
Map<String, List<String>> headers();

/**
* Return the header as a single string value
*
* @return will be null if the header is unset
*/
default String header(String key) {
List<String> headers = headers(key);
if (headers.size() == 1) {
return headers.get(0);
}
if (headers.isEmpty()) {
return null;
}
return headers.stream().collect(Collectors.joining(","));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;

public abstract class StandardHttpClient<C extends HttpClient, F extends HttpClient.Factory, T extends StandardHttpClientBuilder<C, F, ?>>
implements HttpClient, RequestTags {
Expand Down Expand Up @@ -85,7 +88,7 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
standardHttpRequest,
() -> consumeBytesOnce(standardHttpRequest, consumer),
r -> r.body().cancel(),
HttpResponse::code);
r -> r);
}

private CompletableFuture<HttpResponse<AsyncBody>> consumeBytesOnce(StandardHttpRequest standardHttpRequest,
Expand Down Expand Up @@ -146,7 +149,7 @@ private CompletableFuture<HttpResponse<AsyncBody>> consumeBytesOnce(StandardHttp
*/
private <V> CompletableFuture<V> retryWithExponentialBackoff(
StandardHttpRequest request, Supplier<CompletableFuture<V>> action, java.util.function.Consumer<V> onCancel,
ToIntFunction<V> codeExtractor) {
Function<V, HttpResponse<?>> responseExtractor) {
final URI uri = request.uri();
final RequestConfig requestConfig = getTag(RequestConfig.class);
final ExponentialBackoffIntervalCalculator retryIntervalCalculator = ExponentialBackoffIntervalCalculator
Expand All @@ -160,18 +163,23 @@ private <V> CompletableFuture<V> retryWithExponentialBackoff(
return AsyncUtils.retryWithExponentialBackoff(action, onCancel, timeout, retryIntervalCalculator,
(response, throwable, retryInterval) -> {
if (response != null) {
final int code = codeExtractor.applyAsInt(response);
if (code >= 500) {
LOG.debug(
"HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
uri, code, retryInterval);
return true;
HttpResponse<?> httpResponse = responseExtractor.apply(response);
if (httpResponse != null) {
final int code = httpResponse.code();
if (code == 429 || code >= 500) {
retryInterval = Math.max(retryAfterMillis(httpResponse), retryInterval);
LOG.debug(
"HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
uri, code, retryInterval);
return true;
}
}
} else {
if (throwable instanceof CompletionException) {
throwable = throwable.getCause();
}
if (throwable instanceof IOException) {
// TODO: may not be specific enough - incorrect ssl settings for example will get caught here
LOG.debug(
String.format("HTTP operation on url: %s should be retried after %d millis because of IOException",
uri, retryInterval),
Expand All @@ -183,6 +191,25 @@ private <V> CompletableFuture<V> retryWithExponentialBackoff(
});
}

private long retryAfterMillis(HttpResponse<?> httpResponse) {
String retryAfter = httpResponse.header("Retry-After");
if (retryAfter != null) {
try {
return Integer.parseInt(retryAfter) * 1000L;
} catch (NumberFormatException e) {
// not a simple number
}
// Kubernetes does not seem to currently use this, but just in case
try {
ZonedDateTime after = ZonedDateTime.parse(retryAfter, DateTimeFormatter.RFC_1123_DATE_TIME);
return after.toEpochSecond() * 1000 - System.currentTimeMillis();
} catch (DateTimeParseException e1) {
// not a recognized http date
}
}
return 0; // we'll just use the default
}

@Override
public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() {
return new StandardWebSocketBuilder(this);
Expand All @@ -201,7 +228,7 @@ final CompletableFuture<WebSocket> buildWebSocket(StandardWebSocketBuilder stand
standardWebSocketBuilder.asHttpRequest(),
() -> buildWebSocketOnce(standardWebSocketBuilder, listener),
r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null)),
r -> Optional.of(r.webSocketUpgradeResponse).map(HttpResponse::code).orElse(null));
r -> r.webSocketUpgradeResponse);

CompletableFuture<WebSocket> result = new CompletableFuture<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void testHttpRetryWithMoreFailuresThanRetries() throws Exception {
.build();

IntStream.range(0, 3).forEach(i -> client.expect(".*", new IOException("Unreachable!")));
client.expect(".*", new TestHttpResponse<AsyncBody>().withCode(500));
client.expect(".*", new TestHttpResponse<AsyncBody>().withCode(403));

CompletableFuture<HttpResponse<AsyncBody>> consumeFuture = client.consumeBytes(
client.newHttpRequestBuilder().uri("http://localhost").build(),
Expand All @@ -132,7 +132,7 @@ void testHttpRetryWithMoreFailuresThanRetries() throws Exception {
long start = System.currentTimeMillis();

// should ultimately error with the final 500
assertEquals(500, consumeFuture.get().code());
assertEquals(403, consumeFuture.get().code());
long stop = System.currentTimeMillis();

// should take longer than the delay
Expand Down Expand Up @@ -172,7 +172,7 @@ void testWebSocketWithLessFailuresThanRetries() throws Exception {
.withRequestRetryBackoffLimit(3)
.withRequestRetryBackoffInterval(50).build())
.build();
final WebSocketResponse error = new WebSocketResponse(new WebSocketUpgradeResponse(null, 500, null), new IOException());
final WebSocketResponse error = new WebSocketResponse(new WebSocketUpgradeResponse(null, 500), new IOException());
IntStream.range(0, 2).forEach(i -> client.wsExpect(".*", error));
client.wsExpect(".*", new WebSocketResponse(new WebSocketUpgradeResponse(null), mock(WebSocket.class)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,11 @@ void testEvict() {
.andReturn(200, new PodBuilder().build())
.once();
server.expect()
.withPath("/api/v1/namespaces/ns1/pods/pod3/eviction")
.withPath("/api/v1/namespaces/ns1/pods/pod-429/eviction")
.andReturn(PodOperationsImpl.HTTP_TOO_MANY_REQUESTS, new PodBuilder().build())
.once();
server.expect()
.withPath("/api/v1/namespaces/ns1/pods/pod3/eviction")
.withPath("/api/v1/namespaces/ns1/pods/pod-429/eviction")
.andReturn(200, new PodBuilder().build())
.once();
server.expect()
Expand All @@ -296,11 +296,8 @@ void testEvict() {
deleted = client.pods().inNamespace("ns1").withName("pod2").evict();
assertTrue(deleted);

// too many requests
deleted = client.pods().inNamespace("ns1").withName("pod3").evict();
assertFalse(deleted);

deleted = client.pods().inNamespace("ns1").withName("pod3").evict();
// too many requests - automatically retries
deleted = client.pods().inNamespace("ns1").withName("pod-429").evict();
assertTrue(deleted);

// unhandled error
Expand Down

0 comments on commit 125e838

Please sign in to comment.