diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java index 5094a7cf29c6a..d046f41ce0590 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java @@ -18,6 +18,7 @@ import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexService; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeases; @@ -665,6 +666,43 @@ public void testFailoverRemotePrimaryToDocrepReplicaReseedToRemotePrimary() thro }); } + /* + Performs the same experiment as testRemotePrimaryDocRepReplica. + + This ensures that the primary shard for the index has moved over to remote + enabled node whereas the replica copy is still left behind on the docrep nodes + + At this stage, segrep lag computation shouldn't consider the docrep shard copy while calculating bytes lag + */ + public void testZeroSegrepLagForShardsWithMixedReplicationGroup() throws Exception { + testRemotePrimaryDocRepReplica(); + String remoteNodeName = internalCluster().client() + .admin() + .cluster() + .prepareNodesStats() + .get() + .getNodes() + .stream() + .filter(nodeStats -> nodeStats.getNode().isRemoteStoreNode()) + .findFirst() + .get() + .getNode() + .getName(); + ReplicationStats replicationStats = internalCluster().client() + .admin() + .cluster() + .prepareNodesStats(remoteNodeName) + .get() + .getNodes() + .get(0) + .getIndices() + .getSegments() + .getReplicationStats(); + assertEquals(0, replicationStats.getMaxBytesBehind()); + assertEquals(0, replicationStats.getTotalBytesBehind()); + assertEquals(0, replicationStats.getMaxReplicationLag()); + } + private void assertReplicaAndPrimaryConsistency(String indexName, int firstBatch, int secondBatch) throws Exception { assertBusy(() -> { Map shardStatsMap = internalCluster().client() diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 6697991aef90b..b9cb5e92d0ed1 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1321,13 +1321,27 @@ public synchronized Set getSegmentReplicationStats if (primaryMode) { return this.checkpoints.entrySet() .stream() - // filter out this shard's allocation id, any shards that are out of sync or unavailable (shard marked in-sync but has not - // been assigned to a node). + /* Filter out: + - This shard's allocation id + - Any shards that are out of sync or unavailable (shard marked in-sync but has not been assigned to a node). + - (For remote store enabled clusters) Any shard that is not yet migrated to remote store enabled nodes during migration + */ .filter( entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync && replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false && isPrimaryRelocation(entry.getKey()) == false + /*Check if the current primary shard is migrating to remote and + all the other shard copies of the same index still hasn't completely moved over + to the remote enabled nodes. Ensures that: + - Vanilla segrep is not enabled + - Remote Store settings are not enabled (This would be done after all shard copies migrate to remote enabled nodes) + - Index is assigned to remote node (Primary has been seeded) but the corresponding replication group entry has not yet moved to remote + */ + && (indexSettings.isRemoteStoreEnabled() + || indexSettings.isSegRepLocalEnabled() + || (indexSettings.isAssignedOnRemoteNode() + && isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(entry.getKey()).currentNodeId()))) ) .map(entry -> buildShardStats(entry.getKey(), entry.getValue())) .collect(Collectors.toUnmodifiableSet());