diff --git a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoverySourceHandler.java index b990e7a64613f..6ec47c08d9066 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoverySourceHandler.java @@ -160,7 +160,7 @@ protected void innerRecoveryToTarget(ActionListener listener, postSendFileComplete(sendFileStep, lastCommit, releaseStore, delayedStaleCommitDeleteOps); long startingSeqNo = Long.parseLong(lastCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; - logger.info("Docs in commit " + (startingSeqNo-1)); + logger.info("Docs in commit " + (startingSeqNo)); assert Transports.assertNotTransportThread(this + "[phase1]"); phase1(lastCommit.get(), startingSeqNo, () -> 0, sendFileStep, true); @@ -302,6 +302,7 @@ protected void updateGlobalCheckpointForShard(long globalCheckpoint) { @Override protected void relocateShard(Runnable forceSegRepRunnable) throws InterruptedException { shard.relocated(childShardsAllocationIds, recoveryTarget::handoffPrimaryContext, forceSegRepRunnable); + recoveryTarget.flushOnAllChildShards(); } public void cleanupChildShardDirectories() throws IOException { diff --git a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryTargetHandler.java index 7fecf84964723..5c9a66ebf503d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoveryTargetHandler.java @@ -18,6 +18,7 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IOUtils; import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.CheckedRunnable; @@ -172,6 +173,24 @@ public void indexTranslogOperationsOnShards(List operations, }); } + /** + * This is required because in translog replay of operations from translog snapshot, each operation is actually + * processed only on one of the child shards and other child shards treat it as a NoOp where only local checkpoint + * is advanced. In this case local checkpoint is also the global checkpoint since we are creating a new shard + * and hence a new replication group. In scenario where one or more of the child shards are relocated before + * next flush gets triggered, translog replay of operations from snapshot taken from lucene in these peer + * recoveries will not have no-ops and therefore, peer recovery will fail while waiting for target shard to + * catch up to global checkpoint. So, to make sure that operations till global checkpoint are available, we + * will need to trigger a flush to create a new commit on all child shards. + * Same principle is applicable till child shards are completely handed off to serve as independent shards + * because no-op ops can continue to arrive till it is done. + */ + public void flushOnAllChildShards() { + recoveryContexts.forEach(recoveryTarget -> { + recoveryTarget.getIndexShard().flush(new FlushRequest().waitIfOngoing(true).force(true)); + }); + } + @Override public void close() throws IOException { recoveryTargets.values().forEach(recoveryTarget ->