Skip to content

Commit

Permalink
[CCR] Clear fetch exceptions if an empty but successful shard changes…
Browse files Browse the repository at this point in the history
… response returns.

Also fixed ShardFollowNodeTaskTests to not return ops when responseSize
is empty. Otherwise ops are returned when no ops are expected to be returned.
  • Loading branch information
martijnvg committed Oct 3, 2018
1 parent 7f5c2f1 commit 04c902f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,16 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
}
innerSendShardChangesRequest(from, maxOperationCount,
response -> {
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
synchronized (ShardFollowNodeTask.this) {
synchronized (ShardFollowNodeTask.this) {
// Always clear fetch exceptions:
fetchExceptions.remove(from);
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
fetchExceptions.remove(from);
operationsReceived += response.getOperations().length;
totalTransferredBytes +=
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}
}
handleReadResponse(from, maxRequiredSeqNo, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public void testReceiveRetryableError() {
mappingVersions.add(1L);
leaderGlobalCheckpoints.add(63L);
maxSeqNos.add(63L);
responseSizes.add(64);
simulateResponse.set(true);
final AtomicLong retryCounter = new AtomicLong();
// before each retry, we assert the fetch failures; after the last retry, the fetch failure should clear
Expand Down Expand Up @@ -228,6 +229,35 @@ public void testReceiveRetryableError() {
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
}

public void testEmptyShardChangesResponseShouldClearFetchException() {
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, -1, -1);

readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
mappingVersions.add(1L);
leaderGlobalCheckpoints.add(-1L);
maxSeqNos.add(-1L);
simulateResponse.set(true);
task.coordinateReads();

// NUmber of requests is equal to initial request + retried attempts
assertThat(shardChangesRequests.size(), equalTo(2));
for (long[] shardChangesRequest : shardChangesRequests) {
assertThat(shardChangesRequest[0], equalTo(0L));
assertThat(shardChangesRequest[1], equalTo(64L));
}

assertFalse("task is not stopped", task.isStopped());
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfFailedFetches(), equalTo(1L));
// the fetch failure should have been cleared:
assertThat(status.fetchExceptions().entrySet(), hasSize(0));
assertThat(status.lastRequestedSeqNo(), equalTo(-1L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(-1L));
}

public void testReceiveTimeout() {
final ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 63, -1);
Expand Down Expand Up @@ -262,6 +292,7 @@ public void testReceiveTimeout() {
mappingVersions.add(1L);
leaderGlobalCheckpoints.add(63L);
maxSeqNos.add(63L);
responseSizes.add(64);
simulateResponse.set(true);

task.coordinateReads();
Expand Down Expand Up @@ -742,7 +773,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con
if (readFailure != null) {
errorHandler.accept(readFailure);
} else if (simulateResponse.get()) {
final int responseSize = responseSizes.size() == 0 ? requestBatchSize : responseSizes.poll();
final int responseSize = responseSizes.size() == 0 ? 0 : responseSizes.poll();
final Translog.Operation[] operations = new Translog.Operation[responseSize];
for (int i = 0; i < responseSize; i++) {
operations[i] = new Translog.NoOp(from + i, 0, "test");
Expand Down

0 comments on commit 04c902f

Please sign in to comment.