Skip to content

Commit

Permalink
explicit commit to ensure no-ops are persisted till global checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasvb90 committed Oct 10, 2024
1 parent acdabb8 commit c23774f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,

postSendFileComplete(sendFileStep, lastCommit, releaseStore, delayedStaleCommitDeleteOps);
long startingSeqNo = Long.parseLong(lastCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L;
logger.info("Docs in commit " + (startingSeqNo-1));
logger.info("Docs in commit " + (startingSeqNo));
assert Transports.assertNotTransportThread(this + "[phase1]");
phase1(lastCommit.get(), startingSeqNo, () -> 0, sendFileStep, true);

Expand Down Expand Up @@ -302,6 +302,7 @@ protected void updateGlobalCheckpointForShard(long globalCheckpoint) {
@Override
protected void relocateShard(Runnable forceSegRepRunnable) throws InterruptedException {
shard.relocated(childShardsAllocationIds, recoveryTarget::handoffPrimaryContext, forceSegRepRunnable);
recoveryTarget.flushOnAllChildShards();
}

public void cleanupChildShardDirectories() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.CheckedRunnable;
Expand Down Expand Up @@ -172,6 +173,24 @@ public void indexTranslogOperationsOnShards(List<Translog.Operation> operations,
});
}

/**
* This is required because in translog replay of operations from translog snapshot, each operation is actually
* processed only on one of the child shards and other child shards treat it as a NoOp where only local checkpoint
* is advanced. In this case local checkpoint is also the global checkpoint since we are creating a new shard
* and hence a new replication group. In scenario where one or more of the child shards are relocated before
* next flush gets triggered, translog replay of operations from snapshot taken from lucene in these peer
* recoveries will not have no-ops and therefore, peer recovery will fail while waiting for target shard to
* catch up to global checkpoint. So, to make sure that operations till global checkpoint are available, we
* will need to trigger a flush to create a new commit on all child shards.
* Same principle is applicable till child shards are completely handed off to serve as independent shards
* because no-op ops can continue to arrive till it is done.
*/
public void flushOnAllChildShards() {
recoveryContexts.forEach(recoveryTarget -> {
recoveryTarget.getIndexShard().flush(new FlushRequest().waitIfOngoing(true).force(true));
});
}

@Override
public void close() throws IOException {
recoveryTargets.values().forEach(recoveryTarget ->
Expand Down

0 comments on commit c23774f

Please sign in to comment.