diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index cd6dbe8af90d9..cc71ef816e525 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -232,6 +232,7 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) return missingFiles; } + // pkg private for tests private boolean validateLocalChecksum(StoreFileMetadata file) { try (IndexInput indexInput = indexShard.store().directory().openInput(file.name(), IOContext.DEFAULT)) { String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput)); @@ -243,7 +244,15 @@ private boolean validateLocalChecksum(StoreFileMetadata file) { return false; } } catch (IOException e) { - throw new UncheckedIOException("Error reading " + file, e); + logger.warn("Error reading " + file, e); + // Delete file on exceptions so that it can be re-downloaded. This is safe to do as this file is local only + // and not referenced by reader. + try { + indexShard.store().directory().deleteFile(file.name()); + } catch (IOException ex) { + throw new UncheckedIOException("Error reading " + file, e); + } + return false; } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 703a7d457d5b6..2ce0bdc607189 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -31,18 +31,20 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.CorruptionUtils; import org.hamcrest.MatcherAssert; import org.junit.Assert; import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -371,37 +373,9 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - Runnable[] runAfterGetFiles = { () -> { throw new RuntimeException("Simulated"); }, () -> {} }; - AtomicInteger index = new AtomicInteger(0); - RemoteStoreReplicationSource testRSReplicationSource = new RemoteStoreReplicationSource(replica) { - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - super.getCheckpointMetadata(replicationId, checkpoint, listener); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - BiConsumer fileProgressTracker, - ActionListener listener - ) { - super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener); - runAfterGetFiles[index.getAndIncrement()].run(); - } - - @Override - public String getDescription() { - return "TestRemoteStoreReplicationSource"; - } - }; - when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); + when(sourceFactory.get(any())).thenReturn( + getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); }) + ); CountDownLatch latch = new CountDownLatch(1); // Start first round of segment replication. This should fail with simulated error but with replica having @@ -412,6 +386,7 @@ public String getDescription() { new SegmentReplicationTargetService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { + latch.countDown(); Assert.fail("Replication should fail with simulated error"); } @@ -421,9 +396,9 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { + latch.countDown(); assertFalse(sendShardFailure); logger.error("Replication error", e); - latch.countDown(); } } ); @@ -439,7 +414,8 @@ public void onReplicationFailure( assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty()); assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES); - // Start next round of segment replication + // Start next round of segment replication and not throwing exception resulting in commit on replica + when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {})); CountDownLatch waitForSecondRound = new CountDownLatch(1); final SegmentReplicationTarget newTarget = targetService.startReplication( replica, @@ -456,9 +432,9 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { + waitForSecondRound.countDown(); logger.error("Replication error", e); Assert.fail("Replication should not fail"); - waitForSecondRound.countDown(); } } ); @@ -471,6 +447,119 @@ public void onReplicationFailure( } } + /** + * This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the + * replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and + * blocking update of reader. Once this is done, it corrupts one segment file and ensure that file is deleted in next + * round of segment replication by ensuring doc count. + */ + public void testNoFailuresOnFileReads() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int docCount = 10; + shards.indexDocs(docCount); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + when(sourceFactory.get(any())).thenReturn( + getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); }) + ); + CountDownLatch waitOnReplicationCompletion = new CountDownLatch(1); + + // Start first round of segment replication. This should fail with simulated error but with replica having + // files in its local store but not in active reader. + SegmentReplicationTarget segmentReplicationTarget = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + waitOnReplicationCompletion.countDown(); + Assert.fail("Replication should fail with simulated error"); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + waitOnReplicationCompletion.countDown(); + assertFalse(sendShardFailure); + } + } + ); + waitOnReplicationCompletion.await(); + assertBusy(() -> { assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); }); + String fileToCorrupt = null; + // Corrupt one data file + Path shardPath = replica.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); + for (String file : replica.store().directory().listAll()) { + if (file.equals("write.lock") || file.startsWith("extra") || file.startsWith("segment")) { + continue; + } + fileToCorrupt = file; + logger.info("--> Corrupting file {}", fileToCorrupt); + try (FileChannel raf = FileChannel.open(shardPath.resolve(file), StandardOpenOption.READ, StandardOpenOption.WRITE)) { + CorruptionUtils.corruptAt(shardPath.resolve(file), raf, (int) (raf.size() - 8)); + } + break; + } + Assert.assertNotNull(fileToCorrupt); + + // Ingest more data and start next round of segment replication + shards.indexDocs(docCount); + primary.refresh("Post corruption"); + replicateSegments(primary, List.of(replica)); + + assertDocCount(primary, 2 * docCount); + assertDocCount(replica, 2 * docCount); + + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + + // clean up + shards.removeReplica(replica); + closeShards(replica); + } + } + + private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) { + return new RemoteStoreReplicationSource(shard) { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + super.getCheckpointMetadata(replicationId, checkpoint, listener); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + BiConsumer fileProgressTracker, + ActionListener listener + ) { + super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener); + postGetFilesRunnable.run(); + } + + @Override + public String getDescription() { + return "TestRemoteStoreReplicationSource"; + } + }; + } + @Override protected void validateShardIdleWithNoReplicas(IndexShard primary) { // ensure search idle conditions are met.