diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java index da37eee88a4e0..bb295c2c6a4bf 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java @@ -52,6 +52,7 @@ import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ReplicationGroup; +import org.opensearch.index.shard.ReplicationGroup.ReplicationAwareShardRouting; import org.opensearch.index.shard.ShardId; import org.opensearch.node.NodeClosedException; import org.opensearch.rest.RestStatus; @@ -226,8 +227,10 @@ private void performOnReplicas( final ShardRouting primaryRouting = primary.routingEntry(); - for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { - if (shard.isSameAllocation(primaryRouting) == false) { + for (final ReplicationAwareShardRouting shardRouting : replicationGroup.getReplicationTargets()) { + ShardRouting shard = shardRouting.getShardRouting(); + // TODO - Add condition of underlying action being replicated regardless i.e. shard bulk and publish checkpoint action + if (!shard.isSameAllocation(primaryRouting) && shardRouting.isReplicated()) { performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions); } } diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 701dec069d946..0cb3e7cd5e387 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -622,8 +622,10 @@ public synchronized void renewPeerRecoveryRetentionLeases() { * If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't * create peer recovery retention leases for every shard copy. */ - assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false - || hasAllPeerRecoveryRetentionLeases == false; + assert (checkpoints.get(shardRouting.allocationId().getId()).tracked + && !checkpoints.get(shardRouting.allocationId().getId()).localTranslog) + || !checkpoints.get(shardRouting.allocationId().getId()).tracked + || !hasAllPeerRecoveryRetentionLeases; return false; } return retentionLease.timestamp() <= renewalTimeMillis @@ -680,20 +682,31 @@ public static class CheckpointState implements Writeable { */ long globalCheckpoint; /** - * whether this shard is treated as in-sync and thus contributes to the global checkpoint calculation + * whether this shard is treated as in-sync and has localTranslog, it contributes to the global checkpoint calculation. */ boolean inSync; /** - * whether this shard is tracked in the replication group, i.e., should receive document updates from the primary. + * whether this shard is tracked in the replication group and has localTranslog, i.e., should receive document updates + * from the primary. Tracked shards with localTranslog would have corresponding retention leases on the primary shard's + * {@link ReplicationTracker}. */ boolean tracked; - public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked) { + // TODO - Need to check if CheckpointState or ReplicationTracker needs a remote stored tracking variable. This can be helpful later. + + /** + * Whether this shard has a local translog copy. This would be true iff it is primary shard or relocating target of primary shard + * in case of remote translog store. Otherwise, this will be always true. + */ + boolean localTranslog; + + public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean localTranslog) { this.localCheckpoint = localCheckpoint; this.globalCheckpoint = globalCheckpoint; this.inSync = inSync; this.tracked = tracked; + this.localTranslog = localTranslog; } public CheckpointState(StreamInput in) throws IOException { @@ -701,6 +714,7 @@ public CheckpointState(StreamInput in) throws IOException { this.globalCheckpoint = in.readZLong(); this.inSync = in.readBoolean(); this.tracked = in.readBoolean(); + this.localTranslog = in.readBoolean(); } @Override @@ -709,13 +723,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeZLong(globalCheckpoint); out.writeBoolean(inSync); out.writeBoolean(tracked); + out.writeBoolean(localTranslog); } /** * Returns a full copy of this object */ public CheckpointState copy() { - return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked); + return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked, localTranslog); } public long getLocalCheckpoint() { @@ -737,6 +752,8 @@ public String toString() { + inSync + ", tracked=" + tracked + + ", localTranslog=" + + localTranslog + '}'; } @@ -750,7 +767,8 @@ public boolean equals(Object o) { if (localCheckpoint != that.localCheckpoint) return false; if (globalCheckpoint != that.globalCheckpoint) return false; if (inSync != that.inSync) return false; - return tracked == that.tracked; + if (tracked != that.tracked) return false; + return localTranslog == that.localTranslog; } @Override @@ -759,6 +777,7 @@ public int hashCode() { result = 31 * result + Long.hashCode(globalCheckpoint); result = 31 * result + Boolean.hashCode(inSync); result = 31 * result + Boolean.hashCode(tracked); + result = 31 * result + Boolean.hashCode(localTranslog); return result; } } @@ -774,7 +793,7 @@ public synchronized ObjectLongMap getInSyncGlobalCheckpoints() { final ObjectLongMap globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size checkpoints.entrySet() .stream() - .filter(e -> e.getValue().inSync) + .filter(e -> e.getValue().inSync && e.getValue().localTranslog) .forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint)); return globalCheckpoints; } @@ -833,6 +852,9 @@ private boolean invariant() { // the current shard is marked as in-sync when the global checkpoint tracker operates in primary mode assert !primaryMode || checkpoints.get(shardAllocationId).inSync; + // the current shard is marked as tracked when the global checkpoint tracker operates in primary mode + assert !primaryMode || checkpoints.get(shardAllocationId).tracked; + // the routing table and replication group is set when the global checkpoint tracker operates in primary mode assert !primaryMode || (routingTable != null && replicationGroup != null) : "primary mode but routing table is " + routingTable @@ -902,7 +924,8 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { - if (checkpoints.get(shardRouting.allocationId().getId()).tracked && !indexSettings().isRemoteTranslogStoreEnabled()) { + CheckpointState cps = checkpoints.get(shardRouting.allocationId().getId()); + if (cps.tracked && cps.localTranslog) { assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( @@ -926,7 +949,11 @@ private static long inSyncCheckpointStates( Function reducer ) { final OptionalLong value = reducer.apply( - checkpoints.values().stream().filter(cps -> cps.inSync).mapToLong(function).filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO) + checkpoints.values() + .stream() + .filter(cps -> cps.inSync && cps.localTranslog) + .mapToLong(function) + .filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO) ); return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO; } @@ -1022,6 +1049,7 @@ private void updateReplicationGroupAndNotify() { } private ReplicationGroup calculateReplicationGroup() { + boolean remoteTranslogEnabled = indexSettings().isRemoteTranslogStoreEnabled(); long newVersion; if (replicationGroup == null) { newVersion = 0; @@ -1032,7 +1060,9 @@ private ReplicationGroup calculateReplicationGroup() { routingTable, checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()), checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()), - newVersion + checkpoints.entrySet().stream().filter(e -> e.getValue().localTranslog).map(Map.Entry::getKey).collect(Collectors.toSet()), + newVersion, + remoteTranslogEnabled ); } @@ -1134,7 +1164,8 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() { final ShardRouting primaryShard = routingTable.primaryShard(); final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); if (retentionLeases.get(leaseId) == null) { - if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { + if (replicationGroup.getReplicationTargets().size() == 1 + && replicationGroup.getReplicationTargets().get(0).getShardRouting().equals(primaryShard)) { assert primaryShard.allocationId().getId().equals(shardAllocationId) : routingTable.assignedShards() + " vs " + shardAllocationId; @@ -1181,6 +1212,7 @@ public synchronized void updateFromClusterManager( ) { assert invariant(); if (applyingClusterStateVersion > appliedClusterStateVersion) { + boolean remoteTranslogEnabled = indexSettings().isRemoteTranslogStoreEnabled(); // check that the cluster-manager does not fabricate new in-sync entries out of thin air once we are in primary mode assert !primaryMode || inSyncAllocationIds.stream().allMatch(inSyncId -> checkpoints.containsKey(inSyncId) && checkpoints.get(inSyncId).inSync) @@ -1197,6 +1229,10 @@ public synchronized void updateFromClusterManager( boolean removedEntries = checkpoints.keySet() .removeIf(aid -> !inSyncAllocationIds.contains(aid) && !initializingAllocationIds.contains(aid)); + ShardRouting primary = routingTable.primaryShard(); + String primaryAllocationId = primary.allocationId().getId(); + String primaryTargetAllocationId = primary.relocating() ? primary.getTargetRelocatingShard().allocationId().getId() : null; + if (primaryMode) { // add new initializingIds that are missing locally. These are fresh shard copies - and not in-sync for (String initializingId : initializingAllocationIds) { @@ -1207,7 +1243,14 @@ public synchronized void updateFromClusterManager( + " as in-sync but it does not exist locally"; final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync)); + boolean localTranslog = true; + if (remoteTranslogEnabled) { + localTranslog = initializingId.equals(primaryAllocationId) || initializingId.equals(primaryTargetAllocationId); + } + checkpoints.put( + initializingId, + new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync, localTranslog) + ); } } if (removedEntries) { @@ -1217,12 +1260,20 @@ public synchronized void updateFromClusterManager( for (String initializingId : initializingAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false)); + boolean localTranslog = true; + if (remoteTranslogEnabled) { + localTranslog = initializingId.equals(primaryAllocationId) || initializingId.equals(primaryTargetAllocationId); + } + checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false, localTranslog)); } for (String inSyncId : inSyncAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true)); + boolean localTranslog = true; + if (remoteTranslogEnabled) { + localTranslog = inSyncId.equals(primaryAllocationId) || inSyncId.equals(primaryTargetAllocationId); + } + checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true, localTranslog)); } } appliedClusterStateVersion = applyingClusterStateVersion; @@ -1395,7 +1446,7 @@ private static long computeGlobalCheckpoint( return fallback; } for (final CheckpointState cps : localCheckpoints) { - if (cps.inSync) { + if (cps.inSync && cps.localTranslog) { if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { // unassigned in-sync replica return fallback; diff --git a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java index 6d19e9f500411..dd04ee786d10f 100644 --- a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java @@ -37,7 +37,9 @@ import org.opensearch.common.util.set.Sets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; /** @@ -49,22 +51,32 @@ public class ReplicationGroup { private final IndexShardRoutingTable routingTable; private final Set inSyncAllocationIds; private final Set trackedAllocationIds; + private final Set localTranslogAllocationIds; private final long version; + private final boolean remoteTranslogEnabled; private final Set unavailableInSyncShards; // derived from the other fields - private final List replicationTargets; // derived from the other fields + private final List replicationTargets; // derived from the other fields private final List skippedShards; // derived from the other fields public ReplicationGroup( IndexShardRoutingTable routingTable, Set inSyncAllocationIds, Set trackedAllocationIds, - long version + Set localTranslogAllocationIds, + long version, + boolean remoteTranslogEnabled ) { + if (!remoteTranslogEnabled) { + assert trackedAllocationIds.equals(localTranslogAllocationIds) + : "In absence of remote translog store, all tracked shards must have local translog store"; + } this.routingTable = routingTable; this.inSyncAllocationIds = inSyncAllocationIds; this.trackedAllocationIds = trackedAllocationIds; + this.localTranslogAllocationIds = localTranslogAllocationIds; this.version = version; + this.remoteTranslogEnabled = remoteTranslogEnabled; this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds()); this.replicationTargets = new ArrayList<>(); @@ -75,7 +87,13 @@ public ReplicationGroup( skippedShards.add(shard); } else { if (trackedAllocationIds.contains(shard.allocationId().getId())) { - replicationTargets.add(shard); + replicationTargets.add( + new ReplicationAwareShardRouting( + remoteTranslogEnabled, + localTranslogAllocationIds.contains(shard.allocationId().getId()), + shard + ) + ); } else { assert inSyncAllocationIds.contains(shard.allocationId().getId()) == false : "in-sync shard copy but not tracked: " + shard; @@ -84,17 +102,32 @@ public ReplicationGroup( if (shard.relocating()) { ShardRouting relocationTarget = shard.getTargetRelocatingShard(); if (trackedAllocationIds.contains(relocationTarget.allocationId().getId())) { - replicationTargets.add(relocationTarget); + replicationTargets.add( + new ReplicationAwareShardRouting( + remoteTranslogEnabled, + localTranslogAllocationIds.contains(relocationTarget.allocationId().getId()), + relocationTarget + ) + ); } else { skippedShards.add(relocationTarget); assert inSyncAllocationIds.contains(relocationTarget.allocationId().getId()) == false - : "in-sync shard copy but not tracked: " + shard; + : "without remote translog, in-sync shard copy but not tracked: " + shard; } } } } } + public ReplicationGroup( + IndexShardRoutingTable routingTable, + Set inSyncAllocationIds, + Set trackedAllocationIds, + long version + ) { + this(routingTable, inSyncAllocationIds, trackedAllocationIds, Collections.emptySet(), version, false); + } + public long getVersion() { return version; } @@ -119,9 +152,10 @@ public Set getUnavailableInSyncShards() { } /** - * Returns the subset of shards in the routing table that should be replicated to. Includes relocation targets. + * Returns the subset of shards in the routing table that should be replicated to basis the remoteTranslogEnabled and + * replicated flag. Includes relocation targets. */ - public List getReplicationTargets() { + public List getReplicationTargets() { return replicationTargets; } @@ -142,7 +176,9 @@ public boolean equals(Object o) { if (!routingTable.equals(that.routingTable)) return false; if (!inSyncAllocationIds.equals(that.inSyncAllocationIds)) return false; - return trackedAllocationIds.equals(that.trackedAllocationIds); + if (!trackedAllocationIds.equals(that.trackedAllocationIds)) return false; + if (!localTranslogAllocationIds.equals(that.localTranslogAllocationIds)) return false; + return remoteTranslogEnabled == that.remoteTranslogEnabled; } @Override @@ -150,6 +186,8 @@ public int hashCode() { int result = routingTable.hashCode(); result = 31 * result + inSyncAllocationIds.hashCode(); result = 31 * result + trackedAllocationIds.hashCode(); + result = 31 * result + localTranslogAllocationIds.hashCode(); + result = 31 * result + Boolean.hashCode(remoteTranslogEnabled); return result; } @@ -162,7 +200,50 @@ public String toString() { + inSyncAllocationIds + ", trackedAllocationIds=" + trackedAllocationIds + + ", localTranslogAllocationIds=" + + localTranslogAllocationIds + + ", remoteTranslogEnabled=" + + remoteTranslogEnabled + '}'; } + /** + * Replication aware ShardRouting used for fanning out replication requests smartly. + */ + public static final class ReplicationAwareShardRouting { + + private final boolean remoteTranslogEnabled; + + private final boolean replicated; + + private final ShardRouting shardRouting; + + public boolean isRemoteTranslogEnabled() { + return remoteTranslogEnabled; + } + + public boolean isReplicated() { + return replicated; + } + + public ShardRouting getShardRouting() { + return shardRouting; + } + + public ReplicationAwareShardRouting( + final boolean remoteTranslogEnabled, + final boolean replicated, + final ShardRouting shardRouting + ) { + // Either remoteTranslogEnabled or replicated is true. It is not possible that a shard is nether having remoteTranslogEnabled as + // true nor replicated as true. + assert remoteTranslogEnabled || replicated; + // ShardRouting has to be non-null always. + assert Objects.nonNull(shardRouting); + this.remoteTranslogEnabled = remoteTranslogEnabled; + this.replicated = replicated; + this.shardRouting = shardRouting; + } + } + }