Skip to content

Commit

Permalink
Fix edge case where flush failures would not get reported as corruption.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Oct 5, 2023
1 parent 9dfc85d commit 2b80149
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
Expand Down Expand Up @@ -1839,15 +1838,15 @@ public void testSendCorruptBytesToReplica() throws Exception {
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
final long originalRecoveryTime = getRecoveryStopTime();
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);
assertNotEquals(originalRecoveryTime, 0);
refresh(INDEX_NAME);
latch.await();
assertTrue(failed.get());
waitForNewPeerRecovery(originalRecoveryTime);
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
assertDocCounts(100, primaryNode, replicaNode);
waitForSearchableDocs(100, primaryNode, replicaNode);
}

public void testWipeSegmentBetweenSyncs() throws Exception {
Expand All @@ -1866,47 +1865,48 @@ public void testWipeSegmentBetweenSyncs() throws Exception {
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 100; i++) {
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
final long originalRecoveryTime = getRecoveryStopTime();
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);

final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
waitForSearchableDocs(INDEX_NAME, 100, List.of(replicaNode));
waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode));
indexShard.store().directory().deleteFile("_0.si");

for (int i = 101; i < 201; i++) {
for (int i = 11; i < 21; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
waitForNewPeerRecovery(originalRecoveryTime);
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
resetCheckIndexStatus();
assertDocCounts(200, primaryNode, replicaNode);
waitForSearchableDocs(20, primaryNode, replicaNode);
}

private static void waitForNewPeerRecovery(long originalRecoveryTime) throws Exception {
private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception {
assertBusy(() -> {
// assert we have a peer recovery after the original
final long time = getRecoveryStopTime();
final long time = getRecoveryStopTime(replicaNode);
assertNotEquals(time, 0);
assertNotEquals(originalRecoveryTime, time);

}, 1, TimeUnit.MINUTES);
}

private static long getRecoveryStopTime() {
private long getRecoveryStopTime(String nodeName) {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get();
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME);
logger.info("Recovery states {}", recoveryResponse);
for (RecoveryState recoveryState : recoveryStates) {
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER) {
if (recoveryState.getTargetNode().getName().equals(nodeName)) {
return recoveryState.getTimer().stopTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
try {
commitSegmentInfos();
} catch (IOException e) {
maybeFailEngine("flush", e);
throw new FlushFailedEngineException(shardId, e);
} finally {
flushLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
import org.opensearch.common.UUIDs;
import org.opensearch.common.lucene.Lucene;
Expand Down Expand Up @@ -261,9 +260,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse)
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
// broken. We have to clean up this shard entirely, remove all files and bubble it up.
try {
try {
store.removeCorruptionMarker();
Expand All @@ -279,14 +276,14 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse)
// In this case the shard is closed at some point while updating the reader.
// This can happen when the engine is closed in a separate thread.
logger.warn("Shard is already closed, closing replication");
} catch (OpenSearchException ex) {
} catch (CancellableThreads.ExecutionCancelledException ex) {
/*
Ignore closed replication target as it can happen due to index shard closed event in a separate thread.
In such scenario, ignore the exception
*/
assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled";
assert cancellableThreads.isCancelled() : "Replication target cancelled but cancellable threads not cancelled";
} catch (Exception ex) {
throw new OpenSearchCorruptionException(ex);
throw new ReplicationFailedException(ex);
} finally {
if (store != null) {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,27 @@ public void testCommitOnCloseThrowsException_decRefStore() throws Exception {
assertThrows(RuntimeException.class, nrtEngineStore::close);
}

public void testFlushThrowsFlushFailedExceptionOnCorruption() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS);
List<Engine.Operation> operations = generateHistoryOnReplica(
randomIntBetween(1, 10),
randomBoolean(),
randomBoolean(),
randomBoolean()
);
indexOperations(nrtEngine, operations);
// wipe the nrt directory initially so we can sync with primary.
cleanAndCopySegmentsFromPrimary(nrtEngine);
nrtEngineStore.directory().deleteFile("_0.si");
assertThrows(FlushFailedEngineException.class, nrtEngine::flush);
assertTrue(nrtEngineStore.isMarkedCorrupted());
// store will throw when eventually closed, not handled here.
assertThrows(RuntimeException.class, nrtEngineStore::close);
}

private void copySegments(Collection<String> latestPrimaryFiles, Engine nrtEngine) throws IOException {
final Store store = nrtEngine.store;
final List<String> replicaFiles = List.of(store.directory().listAll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.indices.replication;

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -553,7 +554,7 @@ public void testForceSegmentSyncHandlerWithFailure() throws Exception {
).txGet();
});
Throwable nestedException = finalizeException.getCause().getCause();
assertTrue(nestedException instanceof IOException);
assertNotNull(ExceptionsHelper.unwrap(finalizeException, IOException.class));
assertTrue(nestedException.getMessage().contains("dummy failure"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void onFailure(Exception e) {
});
}

public void testFailure_finalizeReplication_IOException() throws IOException {
public void testFailure_finalizeReplication_NonCorruptionException() throws IOException {

IOException exception = new IOException("dummy failure");
SegmentReplicationSource segrepSource = new TestReplicationSource() {
Expand Down Expand Up @@ -288,6 +288,7 @@ public void onResponse(Void replicationResponse) {

@Override
public void onFailure(Exception e) {
assertEquals(ReplicationFailedException.class, e.getClass());
assertEquals(exception, e.getCause());
segrepTarget.fail(new ReplicationFailedException(e), false);
}
Expand Down

0 comments on commit 2b80149

Please sign in to comment.