Skip to content

Commit

Permalink
[Segment Replication] Handle exceptions on local file read during rep…
Browse files Browse the repository at this point in the history
…lication (opensearch-project#10933)

* Handle exceptions on file read

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
dreamer-89 authored and shiv0408 committed Apr 25, 2024
1 parent 97d7be7 commit 300496f
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
return missingFiles;
}

// pkg private for tests
private boolean validateLocalChecksum(StoreFileMetadata file) {
try (IndexInput indexInput = indexShard.store().directory().openInput(file.name(), IOContext.DEFAULT)) {
String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput));
Expand All @@ -243,7 +244,15 @@ private boolean validateLocalChecksum(StoreFileMetadata file) {
return false;
}
} catch (IOException e) {
throw new UncheckedIOException("Error reading " + file, e);
logger.warn("Error reading " + file, e);
// Delete file on exceptions so that it can be re-downloaded. This is safe to do as this file is local only
// and not referenced by reader.
try {
indexShard.store().directory().deleteFile(file.name());
} catch (IOException ex) {
throw new UncheckedIOException("Error reading " + file, e);
}
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.CorruptionUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -371,37 +373,9 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
Runnable[] runAfterGetFiles = { () -> { throw new RuntimeException("Simulated"); }, () -> {} };
AtomicInteger index = new AtomicInteger(0);
RemoteStoreReplicationSource testRSReplicationSource = new RemoteStoreReplicationSource(replica) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
runAfterGetFiles[index.getAndIncrement()].run();
}

@Override
public String getDescription() {
return "TestRemoteStoreReplicationSource";
}
};
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch latch = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
Expand All @@ -412,6 +386,7 @@ public String getDescription() {
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
latch.countDown();
Assert.fail("Replication should fail with simulated error");
}

Expand All @@ -421,9 +396,9 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
latch.countDown();
assertFalse(sendShardFailure);
logger.error("Replication error", e);
latch.countDown();
}
}
);
Expand All @@ -439,7 +414,8 @@ public void onReplicationFailure(
assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);

// Start next round of segment replication
// Start next round of segment replication and not throwing exception resulting in commit on replica
when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {}));
CountDownLatch waitForSecondRound = new CountDownLatch(1);
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
Expand All @@ -456,9 +432,9 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
waitForSecondRound.countDown();
logger.error("Replication error", e);
Assert.fail("Replication should not fail");
waitForSecondRound.countDown();
}
}
);
Expand All @@ -471,6 +447,119 @@ public void onReplicationFailure(
}
}

/**
* This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the
* replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and
* blocking update of reader. Once this is done, it corrupts one segment file and ensure that file is deleted in next
* round of segment replication by ensuring doc count.
*/
public void testNoFailuresOnFileReads() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

final int docCount = 10;
shards.indexDocs(docCount);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch waitOnReplicationCompletion = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
SegmentReplicationTarget segmentReplicationTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
waitOnReplicationCompletion.countDown();
Assert.fail("Replication should fail with simulated error");
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
waitOnReplicationCompletion.countDown();
assertFalse(sendShardFailure);
}
}
);
waitOnReplicationCompletion.await();
assertBusy(() -> { assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); });
String fileToCorrupt = null;
// Corrupt one data file
Path shardPath = replica.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
for (String file : replica.store().directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra") || file.startsWith("segment")) {
continue;
}
fileToCorrupt = file;
logger.info("--> Corrupting file {}", fileToCorrupt);
try (FileChannel raf = FileChannel.open(shardPath.resolve(file), StandardOpenOption.READ, StandardOpenOption.WRITE)) {
CorruptionUtils.corruptAt(shardPath.resolve(file), raf, (int) (raf.size() - 8));
}
break;
}
Assert.assertNotNull(fileToCorrupt);

// Ingest more data and start next round of segment replication
shards.indexDocs(docCount);
primary.refresh("Post corruption");
replicateSegments(primary, List.of(replica));

assertDocCount(primary, 2 * docCount);
assertDocCount(replica, 2 * docCount);

final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap());
assertTrue(diff.missing.isEmpty());
assertTrue(diff.different.isEmpty());

// clean up
shards.removeReplica(replica);
closeShards(replica);
}
}

private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) {
return new RemoteStoreReplicationSource(shard) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
postGetFilesRunnable.run();
}

@Override
public String getDescription() {
return "TestRemoteStoreReplicationSource";
}
};
}

@Override
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
Expand Down

0 comments on commit 300496f

Please sign in to comment.