Skip to content

Commit

Permalink
Fix SnapshotShardStatus Reporting for Failed Shard (#48556)
Browse files Browse the repository at this point in the history
Fixes the shard snapshot status reporting for failed shards
in the corner case of failing the shard because of an exception
thrown in `SnapshotShardsService` and not the repository.
We were missing the update on the `snapshotStatus` instance in
this case which made the transport APIs using this field report
back an incorrect status.
Fixed by moving the failure handling to the `SnapshotShardsService`
for all cases (which also simplifies the code, the ex. wrapping in
the repository was pointless as we only used the ex. trace upstream
anyway).
Also, added an assertion to another test that explicitly checks this
failure situation (ex. in the `SnapshotShardsService`) already.

Closes #48526
  • Loading branch information
original-brownbear authored Oct 29, 2019
1 parent e5646fe commit 752fa87
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
Expand Down Expand Up @@ -1042,10 +1041,6 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
ActionListener<String> listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
final ActionListener<String> snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> {
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e));
listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e));
});
try {
final String generation = snapshotStatus.generation();
logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
Expand Down Expand Up @@ -1191,8 +1186,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
}
}
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration);
snapshotDoneListener.onResponse(indexGeneration);
}, snapshotDoneListener::onFailure);
listener.onResponse(indexGeneration);
}, listener::onFailure);
if (indexIncrementalFileCount == 0) {
allFilesUploadedListener.onResponse(Collections.emptyList());
return;
Expand Down Expand Up @@ -1222,7 +1217,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
}));
}
} catch (Exception e) {
snapshotDoneListener.onFailure(e);
listener.onFailure(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,10 @@ public void onResponse(String newGeneration) {

@Override
public void onFailure(Exception e) {
final String failure = ExceptionsHelper.stackTrace(e);
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.stackTrace(e));
notifyFailedSnapshotShard(snapshot, shardId, failure);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,12 @@ public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
disruption.startDisrupting();
logger.info("--> restarting data node, which should cause primary shards to be failed");
internalCluster().restartNode(dataNode, InternalTestCluster.EMPTY_CALLBACK);

logger.info("--> wait for shard snapshots to show as failed");
assertBusy(() -> assertThat(
client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots()
.get(0).getShardsStats().getFailedShards(), greaterThanOrEqualTo(1)), 60L, TimeUnit.SECONDS);

unblockNode("test-repo", dataNode);
disruption.stopDisrupting();
// check that snapshot completes
Expand Down

0 comments on commit 752fa87

Please sign in to comment.