Skip to content

Commit

Permalink
CheckpointState enhanced to support no-op replication
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Nov 17, 2022
1 parent 059b614 commit 4913941
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.index.shard.ReplicationGroup.ReplicationAwareShardRouting;
import org.opensearch.index.shard.ShardId;
import org.opensearch.node.NodeClosedException;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -226,8 +227,10 @@ private void performOnReplicas(

final ShardRouting primaryRouting = primary.routingEntry();

for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
for (final ReplicationAwareShardRouting shardRouting : replicationGroup.getReplicationTargets()) {
ShardRouting shard = shardRouting.getShardRouting();
// TODO - Add condition of underlying action being replicated regardless i.e. shard bulk and publish checkpoint action
if (!shard.isSameAllocation(primaryRouting) && shardRouting.isReplicated()) {
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,10 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
* create peer recovery retention leases for every shard copy.
*/
assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false
|| hasAllPeerRecoveryRetentionLeases == false;
assert (checkpoints.get(shardRouting.allocationId().getId()).tracked
&& !checkpoints.get(shardRouting.allocationId().getId()).localTranslog)
|| !checkpoints.get(shardRouting.allocationId().getId()).tracked
|| !hasAllPeerRecoveryRetentionLeases;
return false;
}
return retentionLease.timestamp() <= renewalTimeMillis
Expand Down Expand Up @@ -680,27 +682,39 @@ public static class CheckpointState implements Writeable {
*/
long globalCheckpoint;
/**
* whether this shard is treated as in-sync and thus contributes to the global checkpoint calculation
* whether this shard is treated as in-sync and has localTranslog, it contributes to the global checkpoint calculation.
*/
boolean inSync;

/**
* whether this shard is tracked in the replication group, i.e., should receive document updates from the primary.
* whether this shard is tracked in the replication group and has localTranslog, i.e., should receive document updates
* from the primary. Tracked shards with localTranslog would have corresponding retention leases on the primary shard's
* {@link ReplicationTracker}.
*/
boolean tracked;

public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked) {
// TODO - Need to check if CheckpointState or ReplicationTracker needs a remote stored tracking variable. This can be helpful later.

/**
* Whether this shard has a local translog copy. This would be true iff it is primary shard or relocating target of primary shard
* in case of remote translog store. Otherwise, this will be always true.
*/
boolean localTranslog;

public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean localTranslog) {
this.localCheckpoint = localCheckpoint;
this.globalCheckpoint = globalCheckpoint;
this.inSync = inSync;
this.tracked = tracked;
this.localTranslog = localTranslog;
}

public CheckpointState(StreamInput in) throws IOException {
this.localCheckpoint = in.readZLong();
this.globalCheckpoint = in.readZLong();
this.inSync = in.readBoolean();
this.tracked = in.readBoolean();
this.localTranslog = in.readBoolean();
}

@Override
Expand All @@ -709,13 +723,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(globalCheckpoint);
out.writeBoolean(inSync);
out.writeBoolean(tracked);
out.writeBoolean(localTranslog);
}

/**
* Returns a full copy of this object
*/
public CheckpointState copy() {
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked);
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked, localTranslog);
}

public long getLocalCheckpoint() {
Expand All @@ -737,6 +752,8 @@ public String toString() {
+ inSync
+ ", tracked="
+ tracked
+ ", localTranslog="
+ localTranslog
+ '}';
}

Expand All @@ -750,7 +767,8 @@ public boolean equals(Object o) {
if (localCheckpoint != that.localCheckpoint) return false;
if (globalCheckpoint != that.globalCheckpoint) return false;
if (inSync != that.inSync) return false;
return tracked == that.tracked;
if (tracked != that.tracked) return false;
return localTranslog == that.localTranslog;
}

@Override
Expand All @@ -759,6 +777,7 @@ public int hashCode() {
result = 31 * result + Long.hashCode(globalCheckpoint);
result = 31 * result + Boolean.hashCode(inSync);
result = 31 * result + Boolean.hashCode(tracked);
result = 31 * result + Boolean.hashCode(localTranslog);
return result;
}
}
Expand All @@ -774,7 +793,7 @@ public synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size
checkpoints.entrySet()
.stream()
.filter(e -> e.getValue().inSync)
.filter(e -> e.getValue().inSync && e.getValue().localTranslog)
.forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint));
return globalCheckpoints;
}
Expand Down Expand Up @@ -833,6 +852,9 @@ private boolean invariant() {
// the current shard is marked as in-sync when the global checkpoint tracker operates in primary mode
assert !primaryMode || checkpoints.get(shardAllocationId).inSync;

// the current shard is marked as tracked when the global checkpoint tracker operates in primary mode
assert !primaryMode || checkpoints.get(shardAllocationId).tracked;

// the routing table and replication group is set when the global checkpoint tracker operates in primary mode
assert !primaryMode || (routingTable != null && replicationGroup != null) : "primary mode but routing table is "
+ routingTable
Expand Down Expand Up @@ -902,7 +924,8 @@ private boolean invariant() {
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked && !indexSettings().isRemoteTranslogStoreEnabled()) {
CheckpointState cps = checkpoints.get(shardRouting.allocationId().getId());
if (cps.tracked && cps.localTranslog) {
assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
: "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(
Expand All @@ -926,7 +949,11 @@ private static long inSyncCheckpointStates(
Function<LongStream, OptionalLong> reducer
) {
final OptionalLong value = reducer.apply(
checkpoints.values().stream().filter(cps -> cps.inSync).mapToLong(function).filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO)
checkpoints.values()
.stream()
.filter(cps -> cps.inSync && cps.localTranslog)
.mapToLong(function)
.filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO)
);
return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO;
}
Expand Down Expand Up @@ -1022,6 +1049,7 @@ private void updateReplicationGroupAndNotify() {
}

private ReplicationGroup calculateReplicationGroup() {
boolean remoteTranslogEnabled = indexSettings().isRemoteTranslogStoreEnabled();
long newVersion;
if (replicationGroup == null) {
newVersion = 0;
Expand All @@ -1032,7 +1060,9 @@ private ReplicationGroup calculateReplicationGroup() {
routingTable,
checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()),
checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()),
newVersion
checkpoints.entrySet().stream().filter(e -> e.getValue().localTranslog).map(Map.Entry::getKey).collect(Collectors.toSet()),
newVersion,
remoteTranslogEnabled
);
}

Expand Down Expand Up @@ -1134,7 +1164,8 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() {
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
if (replicationGroup.getReplicationTargets().size() == 1
&& replicationGroup.getReplicationTargets().get(0).getShardRouting().equals(primaryShard)) {
assert primaryShard.allocationId().getId().equals(shardAllocationId) : routingTable.assignedShards()
+ " vs "
+ shardAllocationId;
Expand Down Expand Up @@ -1181,6 +1212,7 @@ public synchronized void updateFromClusterManager(
) {
assert invariant();
if (applyingClusterStateVersion > appliedClusterStateVersion) {
boolean remoteTranslogEnabled = indexSettings().isRemoteTranslogStoreEnabled();
// check that the cluster-manager does not fabricate new in-sync entries out of thin air once we are in primary mode
assert !primaryMode
|| inSyncAllocationIds.stream().allMatch(inSyncId -> checkpoints.containsKey(inSyncId) && checkpoints.get(inSyncId).inSync)
Expand All @@ -1197,6 +1229,10 @@ public synchronized void updateFromClusterManager(
boolean removedEntries = checkpoints.keySet()
.removeIf(aid -> !inSyncAllocationIds.contains(aid) && !initializingAllocationIds.contains(aid));

ShardRouting primary = routingTable.primaryShard();
String primaryAllocationId = primary.allocationId().getId();
String primaryTargetAllocationId = primary.relocating() ? primary.getTargetRelocatingShard().allocationId().getId() : null;

if (primaryMode) {
// add new initializingIds that are missing locally. These are fresh shard copies - and not in-sync
for (String initializingId : initializingAllocationIds) {
Expand All @@ -1207,7 +1243,14 @@ public synchronized void updateFromClusterManager(
+ " as in-sync but it does not exist locally";
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
boolean localTranslog = true;
if (remoteTranslogEnabled) {
localTranslog = initializingId.equals(primaryAllocationId) || initializingId.equals(primaryTargetAllocationId);
}
checkpoints.put(
initializingId,
new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync, localTranslog)
);
}
}
if (removedEntries) {
Expand All @@ -1217,12 +1260,20 @@ public synchronized void updateFromClusterManager(
for (String initializingId : initializingAllocationIds) {
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false));
boolean localTranslog = true;
if (remoteTranslogEnabled) {
localTranslog = initializingId.equals(primaryAllocationId) || initializingId.equals(primaryTargetAllocationId);
}
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false, localTranslog));
}
for (String inSyncId : inSyncAllocationIds) {
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true));
boolean localTranslog = true;
if (remoteTranslogEnabled) {
localTranslog = inSyncId.equals(primaryAllocationId) || inSyncId.equals(primaryTargetAllocationId);
}
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true, localTranslog));
}
}
appliedClusterStateVersion = applyingClusterStateVersion;
Expand Down Expand Up @@ -1395,7 +1446,7 @@ private static long computeGlobalCheckpoint(
return fallback;
}
for (final CheckpointState cps : localCheckpoints) {
if (cps.inSync) {
if (cps.inSync && cps.localTranslog) {
if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// unassigned in-sync replica
return fallback;
Expand Down
Loading

0 comments on commit 4913941

Please sign in to comment.