Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add exponential backoff strategy for retry #156

Merged
merged 7 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
1. [#150](https://github.com/influxdata/influxdb-client-java/pull/150): flux-dsl: added support for an offset parameter to limit operator, aggregates accept only a 'column' parameter
1. [#156](https://github.com/influxdata/influxdb-client-java/pull/156): Added exponential backoff strategy for batching writes. Default value for `retryInterval` is 5_000 milliseconds.

### API
1. [#139](https://github.com/influxdata/influxdb-client-java/pull/148): Changed default port from 9999 to 8086
Expand Down
5 changes: 4 additions & 1 deletion client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ The writes are processed in batches which are configurable by `WriteOptions`:
| **batchSize** | the number of data point to collect in batch | 1000 |
| **flushInterval** | the number of milliseconds before the batch is written | 1000 |
| **jitterInterval** | the number of milliseconds to increase the batch flush interval by a random amount | 0 |
| **retryInterval** | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.| 1000 |
| **retryInterval** | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.| 5000 |
| **maxRetries** | the number of max retries when write fails | 3 |
| **maxRetryDelay** | the maximum delay between each retry attempt in milliseconds | 180_000 |
| **exponentialBase** | the base for the exponential retry delay, the next delay is computed as `retryInterval * exponentialBase^(attempts-1) + random(jitterInterval)` | 5 |
| **bufferLimit** | the maximum number of unwritten stored points | 10000 |
| **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST |

Expand Down
88 changes: 85 additions & 3 deletions client/src/main/java/com/influxdb/client/WriteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* <ul>
* <li>batchSize = 1000</li>
* <li>flushInterval = 1000 ms</li>
* <li>retryInterval = 1000 ms</li>
* <li>retryInterval = 5000 ms</li>
* <li>jitterInterval = 0</li>
* <li>bufferLimit = 10_000</li>
* </ul>
Expand All @@ -55,7 +55,10 @@ public final class WriteOptions {
private static final int DEFAULT_BATCH_SIZE = 1000;
private static final int DEFAULT_FLUSH_INTERVAL = 1000;
private static final int DEFAULT_JITTER_INTERVAL = 0;
private static final int DEFAULT_RETRY_INTERVAL = 1000;
private static final int DEFAULT_RETRY_INTERVAL = 5000;
private static final int DEFAULT_MAX_RETRIES = 3;
private static final int DEFAULT_MAX_RETRY_DELAY = 180_000;
private static final int DEFAULT_EXPONENTIAL_BASE = 5;
private static final int DEFAULT_BUFFER_LIMIT = 10000;

/**
Expand All @@ -67,6 +70,9 @@ public final class WriteOptions {
private final int flushInterval;
private final int jitterInterval;
private final int retryInterval;
private final int maxRetries;
private final int maxRetryDelay;
private final int exponentialBase;
private final int bufferLimit;
private final Scheduler writeScheduler;
private final BackpressureOverflowStrategy backpressureStrategy;
Expand Down Expand Up @@ -95,7 +101,6 @@ public int getJitterInterval() {
return jitterInterval;
}


/**
* The retry interval is used when the InfluxDB server does not specify "Retry-After" header.
* <br>
Expand All @@ -108,6 +113,38 @@ public int getRetryInterval() {
return retryInterval;
}

/**
* The number of max retries when write fails.
*
* @return number of max retries
* @see WriteOptions.Builder#maxRetries(int)
*/
public int getMaxRetries() {
return maxRetries;
}

/**
* The maximum delay between each retry attempt in milliseconds.
*
* @return maximum delay
* @see WriteOptions.Builder#maxRetryDelay(int)
*/
public int getMaxRetryDelay() {
return maxRetryDelay;
}

/**
* The base for the exponential retry delay.
*
* The next delay is computed as: retryInterval * exponentialBase^(attempts-1) + random(jitterInterval)
*
* @return exponential base
* @see WriteOptions.Builder#exponentialBase(int)
*/
public int getExponentialBase() {
return exponentialBase;
}

/**
* @return Maximum number of points stored in the retry buffer.
* @see WriteOptions.Builder#bufferLimit(int)
Expand Down Expand Up @@ -142,6 +179,9 @@ private WriteOptions(@Nonnull final Builder builder) {
flushInterval = builder.flushInterval;
jitterInterval = builder.jitterInterval;
retryInterval = builder.retryInterval;
maxRetries = builder.maxRetries;
maxRetryDelay = builder.maxRetryDelay;
exponentialBase = builder.exponentialBase;
bufferLimit = builder.bufferLimit;
writeScheduler = builder.writeScheduler;
backpressureStrategy = builder.backpressureStrategy;
Expand All @@ -167,6 +207,9 @@ public static class Builder {
private int flushInterval = DEFAULT_FLUSH_INTERVAL;
private int jitterInterval = DEFAULT_JITTER_INTERVAL;
private int retryInterval = DEFAULT_RETRY_INTERVAL;
private int maxRetries = DEFAULT_MAX_RETRIES;
private int maxRetryDelay = DEFAULT_MAX_RETRY_DELAY;
private int exponentialBase = DEFAULT_EXPONENTIAL_BASE;
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
private Scheduler writeScheduler = Schedulers.newThread();
private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST;
Expand Down Expand Up @@ -229,6 +272,45 @@ public Builder retryInterval(final int retryInterval) {
return this;
}

/**
* The number of max retries when write fails.
*
* @param maxRetries number of max retries
* @return {@code this}
*/
@Nonnull
public Builder maxRetries(final int maxRetries) {
Arguments.checkPositiveNumber(maxRetries, "maxRetries");
this.maxRetries = maxRetries;
return this;
}

/**
* The maximum delay between each retry attempt in milliseconds.
*
* @param maxRetryDelay maximum delay
* @return {@code this}
*/
@Nonnull
public Builder maxRetryDelay(final int maxRetryDelay) {
Arguments.checkPositiveNumber(maxRetryDelay, "maxRetryDelay");
this.maxRetryDelay = maxRetryDelay;
return this;
}

/**
* The base for the exponential retry delay.
*
* @param exponentialBase exponential base
* @return {@code this}
*/
@Nonnull
public Builder exponentialBase(final int exponentialBase) {
Arguments.checkPositiveNumber(exponentialBase, "exponentialBase");
this.exponentialBase = exponentialBase;
return this;
}

/**
* The client maintains a buffer for failed writes so that the writes will be retried later on. This may
* help to overcome temporary network problems or InfluxDB load spikes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public AbstractInfluxDBClient(@Nonnull final InfluxDBClientOptions options, @Non
this.gzipInterceptor = new GzipInterceptor();

this.okHttpClient = options.getOkHttpClient()
// Connection errors are handled by RetryAttempt in AbstractWriteClient.
.retryOnConnectionFailure(false)
.addInterceptor(new UserAgentInterceptor(clientType))
.addInterceptor(this.loggingInterceptor)
.addInterceptor(this.authenticateInterceptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
*/
package com.influxdb.client.internal;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -67,7 +65,6 @@
public abstract class AbstractWriteClient extends AbstractRestClient implements AutoCloseable {

private static final Logger LOG = Logger.getLogger(AbstractWriteClient.class.getName());
private static final List<Integer> ABLE_TO_RETRY_ERRORS = Arrays.asList(429, 503);
private static final String CLOSED_EXCEPTION = "WriteApi is closed. "
+ "Data should be written before calling InfluxDBClient.close or WriteApi.close.";
private static final int DEFAULT_WAIT = 30_000;
Expand Down Expand Up @@ -264,7 +261,7 @@ private FlowableTransformer<BatchWriteItem, BatchWriteItem> jitter(@Nonnull fina
//
return source.delay((Function<BatchWriteItem, Flowable<Long>>) pointFlowable -> {

int delay = jitterDelay();
int delay = RetryAttempt.jitterDelay(writeOptions.getJitterInterval());

LOG.log(Level.FINEST, "Generated Jitter dynamic delay: {0}", delay);

Expand All @@ -273,11 +270,6 @@ private FlowableTransformer<BatchWriteItem, BatchWriteItem> jitter(@Nonnull fina
};
}

private int jitterDelay() {

return (int) (Math.random() * writeOptions.getJitterInterval());
}

private <T extends AbstractWriteEvent> void publish(@Nonnull final T event) {

Arguments.checkNotNull(event, "event");
Expand Down Expand Up @@ -518,49 +510,26 @@ private Function<Flowable<Throwable>, Publisher<?>> retryHandler(@Nonnull final
Objects.requireNonNull(writeOptions, "WriteOptions are required");
Objects.requireNonNull(retryScheduler, "RetryScheduler is required");

return errors -> errors.flatMap(throwable -> {

if (throwable instanceof HttpException) {

HttpException ie = (HttpException) throwable;

//
// The type of error is not able to retry
//
if (!ABLE_TO_RETRY_ERRORS.contains(ie.code())) {

return Flowable.error(throwable);
}
return errors -> errors
.zipWith(Flowable.range(1, writeOptions.getMaxRetries() + 1),
(throwable, count) -> new RetryAttempt(throwable, count, writeOptions))
.flatMap(attempt -> {

//
// Retry request
//
long retryInterval;
Throwable throwable = attempt.getThrowable();
if (attempt.isRetry()) {

String retryAfter = ((HttpException) throwable).response().headers().get("Retry-After");
if (retryAfter != null) {
long retryInterval = attempt.getRetryInterval();

retryInterval = TimeUnit.MILLISECONDS.convert(Integer.parseInt(retryAfter), TimeUnit.SECONDS);
} else {
publish(new WriteRetriableErrorEvent(throwable, retryInterval));

retryInterval = writeOptions.getRetryInterval();

String msg = "The InfluxDB does not specify \"Retry-After\". Use the default retryInterval: {0}";
LOG.log(Level.FINEST, msg, retryInterval);
}

retryInterval = retryInterval + jitterDelay();

publish(new WriteRetriableErrorEvent(throwable, retryInterval));

return Flowable.just("notify").delay(retryInterval, TimeUnit.MILLISECONDS, retryScheduler);
}
return Flowable.just("notify").delay(retryInterval, TimeUnit.MILLISECONDS, retryScheduler);
}

//
// This type of throwable is not able to retry
//
return Flowable.error(throwable);
});
//
// This type of throwable is not able to retry
//
return Flowable.error(throwable);
});
}

@Nonnull
Expand Down
Loading