Skip to content

Commit

Permalink
Smaller max delays and retry/repeat strategies with delta jitter (#1447)
Browse files Browse the repository at this point in the history
__Motivation__

When using `[retry,repeat]WithExponentialBackoffDeltaJitter()` if the `maxDelay` is closer to `initialDelay`, it may so happen that after a few retries the `baseDelayNanos` (delay before jitter) becomes greater than `maxDelayNanos`. While getting the random jitter we make sure to not exceed `maxDelayNanos` for the upper bound but lower bound may exceed `maxDelayNanos` hence making the call to `nextLong()` invalid.

__Modification__

Add a cap on `baseDelayNanos` such that it never exceeds `maxDelayNanos` and hence the call to `nextLong` always has the lower bound smaller than the upper bound.

__Result__

`[retry,repeat]WithExponentialBackoffDeltaJitter()` can work with smaller `maxDelay` values.
  • Loading branch information
Nitesh Kant authored Mar 19, 2021
1 parent 57c7e74 commit 35b4293
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.IntFunction;

import static io.servicetalk.concurrent.api.Completable.failed;
import static io.servicetalk.concurrent.api.RetryStrategies.baseDelayNanos;
import static io.servicetalk.concurrent.api.RetryStrategies.checkJitterDelta;
import static io.servicetalk.concurrent.api.RetryStrategies.checkMaxRetries;
import static io.servicetalk.concurrent.api.RetryStrategies.maxShift;
Expand Down Expand Up @@ -166,7 +167,7 @@ public static IntFunction<Completable> repeatWithExponentialBackoffFullJitter(fi
final long maxDelayNanos = maxDelay.toNanos();
final long maxInitialShift = maxShift(initialDelayNanos);
return repeatCount -> timerExecutor.timer(current().nextLong(0,
min(maxDelayNanos, initialDelayNanos << min(maxInitialShift, repeatCount - 1))), NANOSECONDS);
baseDelayNanos(initialDelayNanos, maxDelayNanos, maxInitialShift, repeatCount)), NANOSECONDS);
}

/**
Expand Down Expand Up @@ -196,7 +197,7 @@ public static IntFunction<Completable> repeatWithExponentialBackoffFullJitter(fi
final long maxInitialShift = maxShift(initialDelayNanos);
return repeatCount -> repeatCount <= maxRepeats ?
timerExecutor.timer(current().nextLong(0,
min(maxDelayNanos, initialDelayNanos << min(maxInitialShift, repeatCount - 1))), NANOSECONDS) :
baseDelayNanos(initialDelayNanos, maxDelayNanos, maxInitialShift, repeatCount)), NANOSECONDS) :
terminateRepeat();
}

Expand All @@ -223,7 +224,7 @@ public static IntFunction<Completable> repeatWithExponentialBackoffDeltaJitter(f
final long maxDelayNanos = maxDelay.toNanos();
final long maxInitialShift = maxShift(initialDelayNanos);
return repeatCount -> {
final long baseDelayNanos = initialDelayNanos << min(maxInitialShift, repeatCount - 1);
final long baseDelayNanos = baseDelayNanos(initialDelayNanos, maxDelayNanos, maxInitialShift, repeatCount);
return timerExecutor.timer(
current().nextLong(max(0, baseDelayNanos - jitterNanos),
min(maxDelayNanos, addWithOverflowProtection(baseDelayNanos, jitterNanos))),
Expand Down Expand Up @@ -261,7 +262,7 @@ public static IntFunction<Completable> repeatWithExponentialBackoffDeltaJitter(f
if (repeatCount > maxRepeats) {
return terminateRepeat();
}
final long baseDelayNanos = initialDelayNanos << min(maxInitialShift, repeatCount - 1);
final long baseDelayNanos = baseDelayNanos(initialDelayNanos, maxDelayNanos, maxInitialShift, repeatCount);
return timerExecutor.timer(
current().nextLong(max(0, baseDelayNanos - jitterNanos),
min(maxDelayNanos, addWithOverflowProtection(baseDelayNanos, jitterNanos))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public static BiIntFunction<Throwable, Completable> retryWithExponentialBackoffF
final long maxInitialShift = maxShift(initialDelayNanos);
return (retryCount, cause) -> causeFilter.test(cause) ?
timerExecutor.timer(current().nextLong(0,
min(maxDelayNanos, initialDelayNanos << min(maxInitialShift, retryCount - 1))), NANOSECONDS) :
baseDelayNanos(initialDelayNanos, maxDelayNanos, maxInitialShift, retryCount)), NANOSECONDS) :
failed(cause);
}

Expand Down Expand Up @@ -204,7 +204,7 @@ public static BiIntFunction<Throwable, Completable> retryWithExponentialBackoffF
final long maxInitialShift = maxShift(initialDelayNanos);
return (retryCount, cause) -> retryCount <= maxRetries && causeFilter.test(cause) ?
timerExecutor.timer(current().nextLong(0,
min(maxDelayNanos, initialDelayNanos << min(maxInitialShift, retryCount - 1))), NANOSECONDS) :
baseDelayNanos(initialDelayNanos, maxDelayNanos, maxInitialShift, retryCount)), NANOSECONDS) :
failed(cause);
}

Expand Down Expand Up @@ -237,7 +237,7 @@ public static BiIntFunction<Throwable, Completable> retryWithExponentialBackoffD
if (!causeFilter.test(cause)) {
return failed(cause);
}
final long baseDelayNanos = initialDelayNanos << min(maxInitialShift, retryCount - 1);
final long baseDelayNanos = baseDelayNanos(initialDelayNanos, maxDelayNanos, maxInitialShift, retryCount);
return timerExecutor.timer(
current().nextLong(max(0, baseDelayNanos - jitterNanos),
min(maxDelayNanos, addWithOverflowProtection(baseDelayNanos, jitterNanos))),
Expand Down Expand Up @@ -278,14 +278,19 @@ public static BiIntFunction<Throwable, Completable> retryWithExponentialBackoffD
if (retryCount > maxRetries || !causeFilter.test(cause)) {
return failed(cause);
}
final long baseDelayNanos = initialDelayNanos << min(maxInitialShift, retryCount - 1);
final long baseDelayNanos = baseDelayNanos(initialDelayNanos, maxDelayNanos, maxInitialShift, retryCount);
return timerExecutor.timer(
current().nextLong(max(0, baseDelayNanos - jitterNanos),
min(maxDelayNanos, addWithOverflowProtection(baseDelayNanos, jitterNanos))),
NANOSECONDS);
};
}

static long baseDelayNanos(final long initialDelayNanos, final long maxDelayNanos, final long maxInitialShift,
final int count) {
return min(maxDelayNanos, initialDelayNanos << min(maxInitialShift, count - 1));
}

static void checkMaxRetries(final int maxRetries) {
if (maxRetries <= 0) {
throw new IllegalArgumentException("maxRetries: " + maxRetries + " (expected: >0)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import java.time.Duration;
import java.util.function.IntFunction;
import java.util.function.UnaryOperator;

import static io.servicetalk.concurrent.api.RepeatStrategies.TerminateRepeatException;
import static io.servicetalk.concurrent.api.RepeatStrategies.repeatWithConstantBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.RepeatStrategies.repeatWithConstantBackoffFullJitter;
import static io.servicetalk.concurrent.api.RepeatStrategies.repeatWithExponentialBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static java.lang.Integer.MAX_VALUE;
import static java.time.Duration.ofDays;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofNanos;
Expand Down Expand Up @@ -78,24 +80,23 @@ public void testExpBackoffMaxRepeats() throws Exception {
}

@Test
public void testExpBackoffWithJitter() throws Exception {
Duration initialDelay = ofSeconds(1);
Duration jitter = ofMillis(500);
RepeatStrategy strategy = new RepeatStrategy(repeatWithExponentialBackoffDeltaJitter(2, initialDelay, jitter,
ofDays(10), timerExecutor));
io.servicetalk.concurrent.test.internal.TestCompletableSubscriber subscriber = strategy.invokeAndListen();
verifyDelayWithDeltaJitter(initialDelay.toNanos(), jitter.toNanos(), 1);
public void testExpBackoffWithJitterLargeMaxDelayAndMaxRetries() throws Exception {
testExpBackoffWithJitter(2, ofSeconds(1), duration -> duration.plus(ofDays(10)));
}

timers.take().verifyListenCalled().onComplete();
subscriber.awaitOnComplete();
verifyNoMoreInteractions(timerExecutor);
@Test
public void testExpBackoffWithJitterLargeMaxDelayAndNoMaxRetries() throws Exception {
testExpBackoffWithJitter(MAX_VALUE, ofSeconds(1), duration -> duration.plus(ofDays(10)));
}

subscriber = strategy.invokeAndListen();
long nextDelay = initialDelay.toNanos() << 1;
verifyDelayWithDeltaJitter(nextDelay, jitter.toNanos(), 2);
timers.take().verifyListenCalled().onComplete();
subscriber.awaitOnComplete();
verifyNoMoreInteractions(timerExecutor);
@Test
public void testExpBackoffWithJitterSmallMaxDelayAndMaxRetries() throws Exception {
testExpBackoffWithJitter(2, ofSeconds(1), duration -> duration.plus(ofMillis(10)));
}

@Test
public void testExpBackoffWithJitterSmallMaxDelayAndNoMaxRetries() throws Exception {
testExpBackoffWithJitter(MAX_VALUE, ofSeconds(1), duration -> duration.plus(ofMillis(10)));
}

@Test
Expand Down Expand Up @@ -124,6 +125,30 @@ private void testMaxRepeats(IntFunction<Completable> actualStrategy, Runnable ve
assertThat(subscriber.awaitOnError(), instanceOf(TerminateRepeatException.class));
}

private void testExpBackoffWithJitter(final int maxRepeats, final Duration initialDelay,
final UnaryOperator<Duration> maxDelayFunc) throws Exception {
Duration jitter = ofMillis(500);
final IntFunction<Completable> strategyFunction = maxRepeats < MAX_VALUE ?
repeatWithExponentialBackoffDeltaJitter(maxRepeats, initialDelay, jitter,
maxDelayFunc.apply(initialDelay), timerExecutor) :
repeatWithExponentialBackoffDeltaJitter(initialDelay, jitter,
maxDelayFunc.apply(initialDelay), timerExecutor);
RepeatStrategy strategy = new RepeatStrategy(strategyFunction);
io.servicetalk.concurrent.test.internal.TestCompletableSubscriber subscriber = strategy.invokeAndListen();
verifyDelayWithDeltaJitter(initialDelay.toNanos(), jitter.toNanos(), 1);

timers.take().verifyListenCalled().onComplete();
subscriber.awaitOnComplete();
verifyNoMoreInteractions(timerExecutor);

subscriber = strategy.invokeAndListen();
long nextDelay = initialDelay.toNanos() << 1;
verifyDelayWithDeltaJitter(nextDelay, jitter.toNanos(), 2);
timers.take().verifyListenCalled().onComplete();
subscriber.awaitOnComplete();
verifyNoMoreInteractions(timerExecutor);
}

private static final class RepeatStrategy {

private int count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.function.UnaryOperator;

import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static java.lang.Integer.MAX_VALUE;
import static java.time.Duration.ofDays;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofNanos;
Expand Down Expand Up @@ -109,25 +111,23 @@ public void testExpBackoffCauseFilter() {
}

@Test
public void testExpBackoffWithJitter() throws Exception {
Duration initialDelay = ofSeconds(1);
Duration jitter = ofMillis(10);
RetryStrategy strategy = new RetryStrategy(retryWithExponentialBackoffDeltaJitter(2, cause -> true,
initialDelay, jitter, ofDays(10), timerExecutor));
io.servicetalk.concurrent.test.internal.TestCompletableSubscriber subscriber =
strategy.invokeAndListen(DELIBERATE_EXCEPTION);
verifyDelayWithDeltaJitter(initialDelay.toNanos(), jitter.toNanos(), 1);
public void testExpBackoffWithJitterLargeMaxDelayAndMaxRetries() throws Exception {
testExpBackoffWithJitter(2, ofSeconds(1), duration -> duration.plus(ofDays(10)));
}

timers.take().verifyListenCalled().onComplete();
subscriber.awaitOnComplete();
verifyNoMoreInteractions(timerExecutor);
@Test
public void testExpBackoffWithJitterLargeMaxDelayAndNoMaxRetries() throws Exception {
testExpBackoffWithJitter(MAX_VALUE, ofSeconds(1), duration -> duration.plus(ofDays(10)));
}

subscriber = strategy.invokeAndListen(DELIBERATE_EXCEPTION);
long nextDelay = initialDelay.toNanos() << 1;
verifyDelayWithDeltaJitter(nextDelay, jitter.toNanos(), 2);
timers.take().verifyListenCalled().onComplete();
subscriber.awaitOnComplete();
verifyNoMoreInteractions(timerExecutor);
@Test
public void testExpBackoffWithJitterSmallMaxDelayAndMaxRetries() throws Exception {
testExpBackoffWithJitter(2, ofSeconds(1), duration -> duration.plus(ofMillis(10)));
}

@Test
public void testExpBackoffWithJitterSmallMaxDelayAndNoMaxRetries() throws Exception {
testExpBackoffWithJitter(MAX_VALUE, ofSeconds(1), duration -> duration.plus(ofMillis(10)));
}

@Test
Expand Down Expand Up @@ -173,6 +173,32 @@ private void testMaxRetries(BiIntFunction<Throwable, Completable> actualStrategy
assertThat(subscriber.awaitOnError(), is(de));
}

private void testExpBackoffWithJitter(final int maxRetries, final Duration initialDelay,
final UnaryOperator<Duration> maxDelayFunc)
throws Exception {
Duration jitter = ofMillis(10);
final BiIntFunction<Throwable, Completable> strategyFunction = maxRetries < MAX_VALUE ?
retryWithExponentialBackoffDeltaJitter(maxRetries, cause -> true,
initialDelay, jitter, maxDelayFunc.apply(initialDelay), timerExecutor) :
retryWithExponentialBackoffDeltaJitter(cause -> true,
initialDelay, jitter, maxDelayFunc.apply(initialDelay), timerExecutor);
RetryStrategy strategy = new RetryStrategy(strategyFunction);
io.servicetalk.concurrent.test.internal.TestCompletableSubscriber subscriber =
strategy.invokeAndListen(DELIBERATE_EXCEPTION);
verifyDelayWithDeltaJitter(initialDelay.toNanos(), jitter.toNanos(), 1);

timers.take().verifyListenCalled().onComplete();
subscriber.awaitOnComplete();
verifyNoMoreInteractions(timerExecutor);

subscriber = strategy.invokeAndListen(DELIBERATE_EXCEPTION);
long nextDelay = initialDelay.toNanos() << 1;
verifyDelayWithDeltaJitter(nextDelay, jitter.toNanos(), 2);
timers.take().verifyListenCalled().onComplete();
subscriber.awaitOnComplete();
verifyNoMoreInteractions(timerExecutor);
}

private static final class RetryStrategy {

private int count;
Expand Down

0 comments on commit 35b4293

Please sign in to comment.