Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix bug where retries within RemoteStoreRefreshListener cause infos/checkpoint mismatch #10760

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 40 additions & 27 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,11 @@
}

/**
* Compute and return the latest ReplicationCheckpoint for a particular shard.
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
* return the most recently computed ReplicationCheckpoint for a particular shard.
* The checkpoint is updated inside a refresh listener and may lag behind the SegmentInfos on the reader.
* To guarantee the checkpoint is upto date with the latest on-reader infos, use `getLatestSegmentInfosAndCheckpoint` instead.
*
* @return {@link ReplicationCheckpoint} - The most recently computed ReplicationCheckpoint.
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return replicationTracker.getLatestReplicationCheckpoint();
Expand All @@ -1633,34 +1636,12 @@
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
assert indexSettings.isSegRepEnabled();

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
getLatestReplicationCheckpoint()
);

if (getEngineOrNull() == null) {
return nullSegmentInfosEmptyCheckpoint;
}
// do not close the snapshot - caller will close it.
GatedCloseable<SegmentInfos> snapshot = null;
try {
snapshot = getSegmentInfosSnapshot();
if (snapshot.get() != null) {
SegmentInfos segmentInfos = snapshot.get();
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
)
);
}
final SegmentInfos segmentInfos = snapshot.get();
return new Tuple<>(snapshot, computeReplicationCheckpoint(segmentInfos));
} catch (IOException | AlreadyClosedException e) {
logger.error("Error Fetching SegmentInfos and latest checkpoint", e);
if (snapshot != null) {
Expand All @@ -1671,7 +1652,39 @@
}
}
}
return nullSegmentInfosEmptyCheckpoint;
return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint());

Check warning on line 1655 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L1655

Added line #L1655 was not covered by tests
}

/**
* Compute the latest {@link ReplicationCheckpoint} from a SegmentInfos.
* This function fetches a metadata snapshot from the store that comes with an IO cost.
* We will reuse the existing stored checkpoint if it is at the same SI version.
*
* @param segmentInfos {@link SegmentInfos} infos to use to compute.
* @return {@link ReplicationCheckpoint} Checkpoint computed from the infos.
* @throws IOException When there is an error computing segment metadata from the store.
*/
ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) throws IOException {
if (segmentInfos == null) {
return ReplicationCheckpoint.empty(shardId);
}
final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint();
if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion()
&& latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) {
return latestReplicationCheckpoint;
}
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
);
logger.trace("Recomputed ReplicationCheckpoint for shard {}", checkpoint);
return checkpoint;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ private boolean syncSegments() {
// in the remote store.
return indexShard.state() != IndexShardState.STARTED || !(indexShard.getEngine() instanceof InternalEngine);
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
beforeSegmentsSync();
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -199,10 +198,7 @@ private boolean syncSegments() {

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: "
+ segmentInfos.getGeneration()
+ " does not match metadata generation: "
+ checkpoint.getSegmentsGen();
final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos);
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
if (counter.incrementAndGet() <= succeedOnAttempt) {
throw new RuntimeException("Inducing failure in upload");
}
return indexShard.getLatestSegmentInfosAndCheckpoint();
})).when(shard).getLatestSegmentInfosAndCheckpoint();
return indexShard.getLatestReplicationCheckpoint();
})).when(shard).computeReplicationCheckpoint(any());

doAnswer(invocation -> {
if (Objects.nonNull(successLatch)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,33 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception {
}
}

public void testReuseReplicationCheckpointWhenLatestInfosIsUnChanged() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
shards.startAll();
shards.indexDocs(10);
shards.refresh("test");
replicateSegments(primaryShard, shards.getReplicas());
shards.assertAllEqual(10);
final ReplicationCheckpoint latestReplicationCheckpoint = primaryShard.getLatestReplicationCheckpoint();
try (GatedCloseable<SegmentInfos> segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) {
assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(segmentInfosSnapshot.get()));
}
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = primaryShard
.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> closeable = latestSegmentInfosAndCheckpoint.v1()) {
assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(closeable.get()));
}
}
}

public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
assertEquals(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard.computeReplicationCheckpoint(null));
}
}

private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard) {
final TransportService transportService = mock(TransportService.class);
when(transportService.getThreadPool()).thenReturn(threadPool);
Expand Down
Loading