Skip to content

Commit

Permalink
[CCR] Improve shard follow task's retryable error handling (#33371)
Browse files Browse the repository at this point in the history
Improve failure handling of retryable errors by retrying remote calls in
a exponential backoff like manner. The delay between a retry would not be
longer than the configured max retry delay. Also retryable errors will be
retried indefinitely.

Relates to #30086
  • Loading branch information
martijnvg committed Sep 12, 2018
1 parent 5fa2acc commit 1579916
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public static class Request extends AcknowledgedRequest<Request> implements ToXC
PARSER.declareLong(Request::setMaxOperationSizeInBytes, AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES);
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(Request::setRetryTimeout,
PARSER.declareField(Request::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()),
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
ShardFollowTask.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(Request::setIdleShardRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
Expand Down Expand Up @@ -95,7 +95,7 @@ public static Request fromXContent(XContentParser parser, String remoteClusterAl
private Long maxOperationSizeInBytes;
private Integer maxConcurrentWriteBatches;
private Integer maxWriteBufferSize;
private TimeValue retryTimeout;
private TimeValue maxRetryDelay;
private TimeValue idleShardRetryDelay;

@Override
Expand Down Expand Up @@ -174,12 +174,12 @@ public void setMaxWriteBufferSize(Integer maxWriteBufferSize) {
this.maxWriteBufferSize = maxWriteBufferSize;
}

public TimeValue getRetryTimeout() {
return retryTimeout;
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}

public void setRetryTimeout(TimeValue retryTimeout) {
this.retryTimeout = retryTimeout;
public void setMaxRetryDelay(TimeValue maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}

public TimeValue getIdleShardRetryDelay() {
Expand All @@ -201,7 +201,7 @@ public void readFrom(StreamInput in) throws IOException {
maxOperationSizeInBytes = in.readOptionalLong();
maxConcurrentWriteBatches = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalVInt();
retryTimeout = in.readOptionalTimeValue();
maxRetryDelay = in.readOptionalTimeValue();
idleShardRetryDelay = in.readOptionalTimeValue();
}

Expand All @@ -216,7 +216,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalLong(maxOperationSizeInBytes);
out.writeOptionalVInt(maxConcurrentWriteBatches);
out.writeOptionalVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(retryTimeout);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(idleShardRetryDelay);
}

Expand Down Expand Up @@ -244,8 +244,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (maxConcurrentWriteBatches != null) {
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
}
if (retryTimeout != null) {
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
if (maxRetryDelay != null) {
builder.field(ShardFollowTask.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
}
if (idleShardRetryDelay != null) {
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
Expand All @@ -268,7 +268,7 @@ public boolean equals(Object o) {
Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) &&
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
Objects.equals(retryTimeout, request.retryTimeout) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay);
}

Expand All @@ -283,7 +283,7 @@ public int hashCode() {
maxOperationSizeInBytes,
maxConcurrentWriteBatches,
maxWriteBufferSize,
retryTimeout,
maxRetryDelay,
idleShardRetryDelay
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -18,7 +19,6 @@
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;

import java.util.ArrayList;
Expand All @@ -43,11 +43,12 @@
*/
public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {

private static final int DELAY_MILLIS = 50;
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);

private final String leaderIndex;
private final ShardFollowTask params;
private final TimeValue retryTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardChangesRequestDelay;
private final BiConsumer<TimeValue, Runnable> scheduler;
private final LongSupplier relativeTimeProvider;
Expand Down Expand Up @@ -79,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.params = params;
this.scheduler = scheduler;
this.relativeTimeProvider = relativeTimeProvider;
this.retryTimeout = params.getRetryTimeout();
this.maxRetryDelay = params.getMaxRetryDelay();
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
/*
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
Expand Down Expand Up @@ -357,20 +358,28 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {

private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
assert e != null;
if (shouldRetry(e)) {
if (isStopped() == false && retryCounter.incrementAndGet() <= FollowIndexAction.RETRY_LIMIT) {
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e);
scheduler.accept(retryTimeout, task);
} else {
markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() +
"] times, aborting...", e));
}
if (shouldRetry(e) && isStopped() == false) {
int currentRetry = retryCounter.incrementAndGet();
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
params.getFollowShardId(), currentRetry), e);
long delay = computeDelay(currentRetry, maxRetryDelay.getMillis());
scheduler.accept(TimeValue.timeValueMillis(delay), task);
} else {
markAsFailed(e);
}
}

private boolean shouldRetry(Exception e) {
static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
// Cap currentRetry to avoid overflow when computing n variable
int maxCurrentRetry = Math.min(currentRetry, 24);
long n = Math.round(Math.pow(2, maxCurrentRetry - 1));
// + 1 here, because nextInt(...) bound is exclusive and otherwise the first delay would always be zero.
int k = Randomness.get().nextInt(Math.toIntExact(n + 1));
int backOffDelay = k * DELAY_MILLIS;
return Math.min(backOffDelay, maxRetryDelayInMillis);
}

private static boolean shouldRetry(Exception e) {
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
TransportActions.isShardNotAvailableException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");

@SuppressWarnings("unchecked")
Expand All @@ -71,8 +71,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
Expand All @@ -87,13 +87,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private final long maxBatchSizeInBytes;
private final int maxConcurrentWriteBatches;
private final int maxWriteBufferSize;
private final TimeValue retryTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardRetryDelay;
private final Map<String, String> headers;

ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount,
int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches,
int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map<String, String> headers) {
int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map<String, String> headers) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
Expand All @@ -102,7 +102,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = retryTimeout;
this.maxRetryDelay = maxRetryDelay;
this.idleShardRetryDelay = idleShardRetryDelay;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}
Expand All @@ -116,7 +116,7 @@ public ShardFollowTask(StreamInput in) throws IOException {
this.maxBatchSizeInBytes = in.readVLong();
this.maxConcurrentWriteBatches = in.readVInt();
this.maxWriteBufferSize = in.readVInt();
this.retryTimeout = in.readTimeValue();
this.maxRetryDelay = in.readTimeValue();
this.idleShardRetryDelay = in.readTimeValue();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}
Expand Down Expand Up @@ -153,8 +153,8 @@ public long getMaxBatchSizeInBytes() {
return maxBatchSizeInBytes;
}

public TimeValue getRetryTimeout() {
return retryTimeout;
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}

public TimeValue getIdleShardRetryDelay() {
Expand Down Expand Up @@ -184,7 +184,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxBatchSizeInBytes);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeTimeValue(retryTimeout);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(idleShardRetryDelay);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
Expand All @@ -210,7 +210,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxBatchSizeInBytes);
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
builder.field(HEADERS.getPreferredName(), headers);
return builder.endObject();
Expand All @@ -229,15 +229,15 @@ public boolean equals(Object o) {
maxConcurrentWriteBatches == that.maxConcurrentWriteBatches &&
maxBatchSizeInBytes == that.maxBatchSizeInBytes &&
maxWriteBufferSize == that.maxWriteBufferSize &&
Objects.equals(retryTimeout, that.retryTimeout) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
Objects.equals(headers, that.headers);
}

@Override
public int hashCode() {
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches,
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers);
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers);
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void start(
request.getMaxOperationSizeInBytes(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferSize(),
request.getRetryTimeout(),
request.getMaxRetryDelay(),
request.getIdleShardRetryDelay(),
filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
request.getMaxOperationSizeInBytes(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferSize(),
request.getRetryTimeout(),
request.getMaxRetryDelay(),
request.getIdleShardRetryDelay()
);
patterns.put(request.getLeaderClusterAlias(), autoFollowPattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
request.setMaxOperationSizeInBytes(randomNonNegativeLong());
}
if (randomBoolean()) {
request.setRetryTimeout(TimeValue.timeValueMillis(500));
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
Expand Down Expand Up @@ -162,8 +162,8 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
if (request.getMaxOperationSizeInBytes() != null) {
assertThat(shardFollowTask.getMaxBatchSizeInBytes(), equalTo(request.getMaxOperationSizeInBytes()));
}
if (request.getRetryTimeout() != null) {
assertThat(shardFollowTask.getRetryTimeout(), equalTo(request.getRetryTimeout()));
if (request.getMaxRetryDelay() != null) {
assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay()));
}
if (request.getIdleShardRetryDelay() != null) {
assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected PutAutoFollowPatternAction.Request createTestInstance() {
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setRetryTimeout(TimeValue.timeValueMillis(500));
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
Expand Down
Loading

0 comments on commit 1579916

Please sign in to comment.