Skip to content

Commit

Permalink
[CCR] Improve shard follow task's retryable error handling
Browse files Browse the repository at this point in the history
Improve failure handling of retryable errors by retrying remote calls in
a exponential backoff like manner. The delay between a retry would not be
longer than the configured max retry delay. Also retryable errors will be
retried indefinitely.

Relates to elastic#30086
  • Loading branch information
martijnvg committed Sep 4, 2018
1 parent 7f7e8fd commit a717f44
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public static class Request extends ActionRequest implements ToXContentObject {
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.RETRY_TIMEOUT.getPreferredName()),
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.MAX_RETRY_DELAY.getPreferredName()),
ShardFollowTask.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
Expand All @@ -126,7 +126,7 @@ public static Request fromXContent(XContentParser parser, String followerIndex)
private long maxOperationSizeInBytes;
private int maxConcurrentWriteBatches;
private int maxWriteBufferSize;
private TimeValue retryTimeout;
private TimeValue maxRetryDelay;
private TimeValue idleShardRetryDelay;

public Request(
Expand All @@ -137,7 +137,7 @@ public Request(
Long maxOperationSizeInBytes,
Integer maxConcurrentWriteBatches,
Integer maxWriteBufferSize,
TimeValue retryTimeout,
TimeValue maxRetryDelay,
TimeValue idleShardRetryDelay) {

if (leaderIndex == null) {
Expand All @@ -161,8 +161,8 @@ public Request(
if (maxWriteBufferSize == null) {
maxWriteBufferSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE;
}
if (retryTimeout == null) {
retryTimeout = ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT;
if (maxRetryDelay == null) {
maxRetryDelay = ShardFollowNodeTask.DEFAULT_MAX_RETRY_DELAY;
}
if (idleShardRetryDelay == null) {
idleShardRetryDelay = ShardFollowNodeTask.DEFAULT_IDLE_SHARD_RETRY_DELAY;
Expand Down Expand Up @@ -191,7 +191,7 @@ public Request(
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = retryTimeout;
this.maxRetryDelay = maxRetryDelay;
this.idleShardRetryDelay = idleShardRetryDelay;
}

Expand Down Expand Up @@ -225,7 +225,7 @@ public void readFrom(StreamInput in) throws IOException {
maxOperationSizeInBytes = in.readVLong();
maxConcurrentWriteBatches = in.readVInt();
maxWriteBufferSize = in.readVInt();
retryTimeout = in.readOptionalTimeValue();
maxRetryDelay = in.readOptionalTimeValue();
idleShardRetryDelay = in.readOptionalTimeValue();
}

Expand All @@ -239,7 +239,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxOperationSizeInBytes);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(retryTimeout);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(idleShardRetryDelay);
}

Expand All @@ -254,7 +254,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
builder.field(ShardFollowTask.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
}
builder.endObject();
Expand All @@ -271,7 +271,7 @@ public boolean equals(Object o) {
maxOperationSizeInBytes == request.maxOperationSizeInBytes &&
maxConcurrentWriteBatches == request.maxConcurrentWriteBatches &&
maxWriteBufferSize == request.maxWriteBufferSize &&
Objects.equals(retryTimeout, request.retryTimeout) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followerIndex, request.followerIndex);
Expand All @@ -287,7 +287,7 @@ public int hashCode() {
maxOperationSizeInBytes,
maxConcurrentWriteBatches,
maxWriteBufferSize,
retryTimeout,
maxRetryDelay,
idleShardRetryDelay
);
}
Expand Down Expand Up @@ -412,7 +412,7 @@ void start(
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes,
request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.retryTimeout,
request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.maxRetryDelay,
request.idleShardRetryDelay, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
public static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
private static final int RETRY_LIMIT = 10;
public static final TimeValue DEFAULT_RETRY_TIMEOUT = new TimeValue(500);
public static final TimeValue DEFAULT_MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5);
public static final TimeValue DEFAULT_IDLE_SHARD_RETRY_DELAY = TimeValue.timeValueSeconds(10);

private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);

private final String leaderIndex;
private final ShardFollowTask params;
private final TimeValue retryTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardChangesRequestDelay;
private final BiConsumer<TimeValue, Runnable> scheduler;
private final LongSupplier relativeTimeProvider;
Expand Down Expand Up @@ -101,7 +100,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.params = params;
this.scheduler = scheduler;
this.relativeTimeProvider = relativeTimeProvider;
this.retryTimeout = params.getRetryTimeout();
this.maxRetryDelay = params.getMaxRetryDelay();
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
/*
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
Expand Down Expand Up @@ -379,20 +378,22 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {

private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
assert e != null;
if (shouldRetry(e)) {
if (isStopped() == false && retryCounter.incrementAndGet() <= RETRY_LIMIT) {
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e);
scheduler.accept(retryTimeout, task);
} else {
markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() +
"] times, aborting...", e));
}
if (shouldRetry(e) && isStopped() == false) {
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e);
int currentRetry = retryCounter.incrementAndGet();
long delay = computeDelay(currentRetry, maxRetryDelay.getMillis());
scheduler.accept(TimeValue.timeValueMillis(delay), task);
} else {
markAsFailed(e);
}
}

private boolean shouldRetry(Exception e) {
static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
long expectedBackOff = Math.round(10 * Math.exp(0.8d * currentRetry) - 1);
return Math.min(expectedBackOff, maxRetryDelayInMillis);
}

private static boolean shouldRetry(Exception e) {
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
TransportActions.isShardNotAvailableException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");

@SuppressWarnings("unchecked")
Expand All @@ -71,8 +71,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
Expand All @@ -87,13 +87,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private final long maxBatchSizeInBytes;
private final int maxConcurrentWriteBatches;
private final int maxWriteBufferSize;
private final TimeValue retryTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardRetryDelay;
private final Map<String, String> headers;

ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount,
int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches,
int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map<String, String> headers) {
int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map<String, String> headers) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
Expand All @@ -102,7 +102,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = retryTimeout;
this.maxRetryDelay = maxRetryDelay;
this.idleShardRetryDelay = idleShardRetryDelay;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}
Expand All @@ -116,7 +116,7 @@ public ShardFollowTask(StreamInput in) throws IOException {
this.maxBatchSizeInBytes = in.readVLong();
this.maxConcurrentWriteBatches = in.readVInt();
this.maxWriteBufferSize = in.readVInt();
this.retryTimeout = in.readTimeValue();
this.maxRetryDelay = in.readTimeValue();
this.idleShardRetryDelay = in.readTimeValue();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}
Expand Down Expand Up @@ -153,8 +153,8 @@ public long getMaxBatchSizeInBytes() {
return maxBatchSizeInBytes;
}

public TimeValue getRetryTimeout() {
return retryTimeout;
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}

public TimeValue getIdleShardRetryDelay() {
Expand Down Expand Up @@ -184,7 +184,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxBatchSizeInBytes);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeTimeValue(retryTimeout);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(idleShardRetryDelay);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
Expand All @@ -210,7 +210,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxBatchSizeInBytes);
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
builder.field(HEADERS.getPreferredName(), headers);
return builder.endObject();
Expand All @@ -229,15 +229,15 @@ public boolean equals(Object o) {
maxConcurrentWriteBatches == that.maxConcurrentWriteBatches &&
maxBatchSizeInBytes == that.maxBatchSizeInBytes &&
maxWriteBufferSize == that.maxWriteBufferSize &&
Objects.equals(retryTimeout, that.retryTimeout) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
Objects.equals(headers, that.headers);
}

@Override
public int hashCode() {
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches,
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers);
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers);
}

public String toString() {
Expand Down
Loading

0 comments on commit a717f44

Please sign in to comment.