Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Segment Replication] Handle exceptions on local file read during replication #10936

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
return missingFiles;
}

// pkg private for tests
private boolean validateLocalChecksum(StoreFileMetadata file) {
try (IndexInput indexInput = indexShard.store().directory().openInput(file.name(), IOContext.DEFAULT)) {
String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput));
Expand All @@ -243,7 +244,15 @@ private boolean validateLocalChecksum(StoreFileMetadata file) {
return false;
}
} catch (IOException e) {
throw new UncheckedIOException("Error reading " + file, e);
logger.warn("Error reading " + file, e);
// Delete file on exceptions so that it can be re-downloaded. This is safe to do as this file is local only
// and not referenced by reader.
try {
indexShard.store().directory().deleteFile(file.name());
} catch (IOException ex) {
throw new UncheckedIOException("Error reading " + file, e);
}
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.CorruptionUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -371,37 +373,9 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
Runnable[] runAfterGetFiles = { () -> { throw new RuntimeException("Simulated"); }, () -> {} };
AtomicInteger index = new AtomicInteger(0);
RemoteStoreReplicationSource testRSReplicationSource = new RemoteStoreReplicationSource(replica) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
runAfterGetFiles[index.getAndIncrement()].run();
}

@Override
public String getDescription() {
return "TestRemoteStoreReplicationSource";
}
};
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch latch = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
Expand All @@ -412,6 +386,7 @@ public String getDescription() {
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
latch.countDown();
Assert.fail("Replication should fail with simulated error");
}

Expand All @@ -421,9 +396,9 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
latch.countDown();
assertFalse(sendShardFailure);
logger.error("Replication error", e);
latch.countDown();
}
}
);
Expand All @@ -439,7 +414,8 @@ public void onReplicationFailure(
assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);

// Start next round of segment replication
// Start next round of segment replication and not throwing exception resulting in commit on replica
when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {}));
CountDownLatch waitForSecondRound = new CountDownLatch(1);
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
Expand All @@ -456,9 +432,9 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
waitForSecondRound.countDown();
logger.error("Replication error", e);
Assert.fail("Replication should not fail");
waitForSecondRound.countDown();
}
}
);
Expand All @@ -471,6 +447,119 @@ public void onReplicationFailure(
}
}

/**
* This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the
* replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and
* blocking update of reader. Once this is done, it corrupts one segment file and ensure that file is deleted in next
* round of segment replication by ensuring doc count.
*/
public void testNoFailuresOnFileReads() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

final int docCount = 10;
shards.indexDocs(docCount);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch waitOnReplicationCompletion = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
SegmentReplicationTarget segmentReplicationTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
waitOnReplicationCompletion.countDown();
Assert.fail("Replication should fail with simulated error");
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
waitOnReplicationCompletion.countDown();
assertFalse(sendShardFailure);
}
}
);
waitOnReplicationCompletion.await();
assertBusy(() -> { assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); });
String fileToCorrupt = null;
// Corrupt one data file
Path shardPath = replica.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
for (String file : replica.store().directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra") || file.startsWith("segment")) {
continue;
}
fileToCorrupt = file;
logger.info("--> Corrupting file {}", fileToCorrupt);
try (FileChannel raf = FileChannel.open(shardPath.resolve(file), StandardOpenOption.READ, StandardOpenOption.WRITE)) {
CorruptionUtils.corruptAt(shardPath.resolve(file), raf, (int) (raf.size() - 8));
}
break;
}
Assert.assertNotNull(fileToCorrupt);

// Ingest more data and start next round of segment replication
shards.indexDocs(docCount);
primary.refresh("Post corruption");
replicateSegments(primary, List.of(replica));

assertDocCount(primary, 2 * docCount);
assertDocCount(replica, 2 * docCount);

final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap());
assertTrue(diff.missing.isEmpty());
assertTrue(diff.different.isEmpty());

// clean up
shards.removeReplica(replica);
closeShards(replica);
}
}

private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) {
return new RemoteStoreReplicationSource(shard) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
postGetFilesRunnable.run();
}

@Override
public String getDescription() {
return "TestRemoteStoreReplicationSource";
}
};
}

@Override
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
Expand Down
Loading