Skip to content

Commit

Permalink
Fix delay computation in Retry policies
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-sladecek committed Apr 15, 2021
1 parent c6aea9a commit f60b873
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,17 +64,12 @@ public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
}

private <T> Single<T> retrySingle(RetryContext<? extends CompletionStage<T>> context) {
long delay = 0;
int currentCallIndex = context.count.getAndIncrement();
if (currentCallIndex != 0) {
Optional<Long> maybeDelay = retryPolicy.nextDelayMillis(context.startedMillis,
context.lastDelay.get(),
currentCallIndex);
if (maybeDelay.isEmpty()) {
return Single.error(context.throwable());
}
delay = maybeDelay.get();
Optional<Long> maybeDelay = computeDelay(context, currentCallIndex);
if (maybeDelay.isEmpty()) {
return Single.error(context.throwable());
}
long delay = maybeDelay.get();

long nanos = System.nanoTime() - context.startedNanos;
if (nanos > maxTimeNanos) {
Expand All @@ -98,6 +93,7 @@ private <T> Single<T> retrySingle(RetryContext<? extends CompletionStage<T>> con
.onErrorResumeWithSingle(throwable -> {
Throwable cause = FaultTolerance.cause(throwable);
context.thrown.add(cause);
context.lastDelay.set(delay);
if (errorChecker.shouldSkip(cause)) {
return Single.error(context.throwable());
}
Expand All @@ -106,17 +102,12 @@ private <T> Single<T> retrySingle(RetryContext<? extends CompletionStage<T>> con
}

private <T> Multi<T> retryMulti(RetryContext<? extends Flow.Publisher<T>> context) {
long delay = 0;
int currentCallIndex = context.count.getAndIncrement();
if (currentCallIndex != 0) {
Optional<Long> maybeDelay = retryPolicy.nextDelayMillis(context.startedMillis,
context.lastDelay.get(),
currentCallIndex);
if (maybeDelay.isEmpty()) {
return Multi.error(context.throwable());
}
delay = maybeDelay.get();
Optional<Long> maybeDelay = computeDelay(context, currentCallIndex);
if (maybeDelay.isEmpty()) {
return Multi.error(context.throwable());
}
long delay = maybeDelay.get();

long nanos = System.nanoTime() - context.startedNanos;
if (nanos > maxTimeNanos) {
Expand All @@ -140,13 +131,23 @@ private <T> Multi<T> retryMulti(RetryContext<? extends Flow.Publisher<T>> contex
.onErrorResumeWith(throwable -> {
Throwable cause = FaultTolerance.cause(throwable);
context.thrown.add(cause);
context.lastDelay.set(delay);
if (task.hadData() || errorChecker.shouldSkip(cause)) {
return Multi.error(context.throwable());
}
return retryMulti(context);
});
}

private Optional<Long> computeDelay(RetryContext<?> context, int currentCallIndex) {
if (currentCallIndex != 0) {
return retryPolicy.nextDelayMillis(context.startedMillis,
context.lastDelay.get(),
currentCallIndex);
}
return Optional.of(0L);
}

@Override
public long retryCounter() {
return retryCounter.get();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,10 @@
package io.helidon.faulttolerance;

import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
Expand Down Expand Up @@ -214,6 +216,55 @@ void testMultiRetriesRead() throws InterruptedException {

}

@Test
public void testLastDelay() {
List<Long> lastDelayCalls = new ArrayList<>();
Retry retry = Retry.builder()
.retryPolicy((firstCallMillis, lastDelay, call) -> {
lastDelayCalls.add(lastDelay);
return Optional.of(lastDelay + 1);
})
.build();


Request req = new Request(3, new RetryException(), new RetryException());
Single<Integer> result = retry.invoke(req::invoke);
result.await(1, TimeUnit.SECONDS);
assertThat(req.call.get(), is(4));

assertThat("Last delay should increase", lastDelayCalls, contains(0L, 1L, 2L));
}

@Test
public void testMultiLastDelay() throws InterruptedException {
List<Long> lastDelayCalls = new ArrayList<>();
Retry retry = Retry.builder()
.retryPolicy((firstCallMillis, lastDelay, call) -> {
lastDelayCalls.add(lastDelay);
return Optional.of(lastDelay + 1);
})
.build();

AtomicInteger count = new AtomicInteger();

TestSubscriber ts = new TestSubscriber();

Multi<Integer> multi = retry.invokeMulti(() -> {
if (count.getAndIncrement() < 3) {
return Multi.error(new RetryException());
} else {
return Multi.just(0, 1, 2);
}
});

multi.subscribe(ts);
ts.request(2);

ts.cdl.await(1, TimeUnit.SECONDS);

assertThat("Last delay should increase", lastDelayCalls, contains(0L, 1L, 2L));
}

private static class PartialPublisher implements Flow.Publisher<Integer> {
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
Expand Down

0 comments on commit f60b873

Please sign in to comment.