Skip to content

Commit

Permalink
Replicate max seq_no of updates to replicas
Browse files Browse the repository at this point in the history
We start tracking max seq_no_of_updates on the primary in elastic#33842. This
commit replicates that value from a primary to its replicas in
replication requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates elastic#33656
  • Loading branch information
dnhatn committed Sep 22, 2018
1 parent 7944a0c commit 6883c3f
Show file tree
Hide file tree
Showing 23 changed files with 383 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -114,9 +115,13 @@ public void execute() throws Exception {
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}

successfulShards.incrementAndGet(); // mark primary as successful
Expand All @@ -136,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
}

private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
final ReplicationGroup replicationGroup) {
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
// for total stats, add number of unassigned shards and
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
Expand All @@ -145,19 +150,20 @@ private void performOnReplicas(final ReplicaRequest replicaRequest, final long g

for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint);
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
}
}
}

private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}

totalShards.incrementAndGet();
pendingActions.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
Expand Down Expand Up @@ -322,6 +328,12 @@ public interface Primary<
*/
long globalCheckpoint();

/**
* Returns the maximum seq_no of updates (index operations overwrite Lucene) or deletes on the primary.
* This value must be captured after the execution of a replication request on the primary is completed.
*/
long maxSeqNoOfUpdatesOrDeletes();

/**
* Returns the current replication group on the primary shard
*
Expand All @@ -338,12 +350,15 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
/**
* Performs the specified request on the specified replica.
*
* @param replica the shard this request should be executed on
* @param replicaRequest the operation to perform
* @param globalCheckpoint the global checkpoint on the primary
* @param listener callback for handling the response or failure
* @param replica the shard this request should be executed on
* @param replicaRequest the operation to perform
* @param globalCheckpoint the global checkpoint on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
* after this replication was executed on it.
* @param listener callback for handling the response or failure
*/
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener<ReplicaResponse> listener);
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);

/**
* Fail the specified shard if needed, removing it from the current set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima

/**
* Synchronously execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
Expand Down Expand Up @@ -489,6 +489,7 @@ public void messageReceived(
replicaRequest.getTargetAllocationID(),
replicaRequest.getPrimaryTerm(),
replicaRequest.getGlobalCheckpoint(),
replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(),
channel,
(ReplicationTask) task).run();
}
Expand All @@ -513,6 +514,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
private final String targetAllocationID;
private final long primaryTerm;
private final long globalCheckpoint;
private final long maxSeqNoOfUpdatesOrDeletes;
private final TransportChannel channel;
private final IndexShard replica;
/**
Expand All @@ -528,6 +530,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
String targetAllocationID,
long primaryTerm,
long globalCheckpoint,
long maxSeqNoOfUpdatesOrDeletes,
TransportChannel channel,
ReplicationTask task) {
this.request = request;
Expand All @@ -536,6 +539,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
this.targetAllocationID = targetAllocationID;
this.primaryTerm = primaryTerm;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
final ShardId shardId = request.shardId();
assert shardId != null : "request shardId must be set";
this.replica = getIndexShard(shardId);
Expand Down Expand Up @@ -575,7 +579,8 @@ public void onNewClusterState(ClusterState state) {
new TransportChannelResponseHandler<>(logger, channel, extraMessage,
() -> TransportResponse.Empty.INSTANCE);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint),
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
handler);
}

Expand Down Expand Up @@ -613,7 +618,7 @@ protected void doRun() throws Exception {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
}

/**
Expand Down Expand Up @@ -1023,6 +1028,11 @@ public long globalCheckpoint() {
return indexShard.getGlobalCheckpoint();
}

@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return indexShard.getMaxSeqNoOfUpdatesOrDeletes();
}

@Override
public ReplicationGroup getReplicationGroup() {
return indexShard.getReplicationGroup();
Expand Down Expand Up @@ -1107,15 +1117,16 @@ public void performOn(
final ShardRouting replica,
final ReplicaRequest request,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
String nodeId = replica.currentNodeId();
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
if (node == null) {
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
return;
}
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest =
new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint);
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
sendReplicaRequest(replicaRequest, node, listener);
}

Expand Down Expand Up @@ -1263,15 +1274,17 @@ public String toString() {
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {

private long globalCheckpoint;
private long maxSeqNoOfUpdatesOrDeletes;

public ConcreteReplicaRequest(final Supplier<R> requestSupplier) {
super(requestSupplier);
}

public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm,
final long globalCheckpoint) {
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
super(request, targetAllocationID, primaryTerm);
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
}

@Override
Expand All @@ -1282,6 +1295,13 @@ public void readFrom(StreamInput in) throws IOException {
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
} else {
// UNASSIGNED_SEQ_NO (-2) means uninitialized, and replicas will disable
// optimization using seq_no if its max_seq_no_of_updates is still uninitialized
maxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

@Override
Expand All @@ -1290,19 +1310,27 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(globalCheckpoint);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
}
}

public long getGlobalCheckpoint() {
return globalCheckpoint;
}

public long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes;
}

@Override
public String toString() {
return "ConcreteReplicaRequest{" +
"targetAllocationID='" + getTargetAllocationID() + '\'' +
", primaryTerm='" + getPrimaryTerm() + '\'' +
", request=" + getRequest() +
", globalCheckpoint=" + globalCheckpoint +
", maxSeqNoOfUpdatesOrDeletes=" + maxSeqNoOfUpdatesOrDeletes +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
Expand Down Expand Up @@ -905,6 +906,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
}
}
}
assert assertMaxSeqNoOfUpdatesIsPropagated(index, plan);
return plan;
}

Expand Down Expand Up @@ -1213,6 +1215,7 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr

protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assertNonPrimaryOrigin(delete);
assert assertMaxSeqNoOfUpdatesIsPropagated(delete);
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
Expand Down Expand Up @@ -2556,6 +2559,27 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}

private boolean assertMaxSeqNoOfUpdatesIsPropagated(Delete delete) {
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
final Version indexVersion = config().getIndexSettings().getIndexVersionCreated();
assert delete.seqNo() <= maxSeqNoOfUpdates ||
(maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersion.before(Version.V_7_0_0_alpha1)) :
"id=" + delete.id() + " seq_no=" + delete.seqNo() + " max_seq_no_of_updates=" + maxSeqNoOfUpdates + " index_version=" + indexVersion;
return true;
}

private boolean assertMaxSeqNoOfUpdatesIsPropagated(Index index, IndexingStrategy plan) {
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
final Version indexVersion = config().getIndexSettings().getIndexVersionCreated();
assert plan.useLuceneUpdateDocument == false
|| index.seqNo() <= maxSeqNoOfUpdates // msu must be propagated
|| getLocalCheckpoint() < maxSeqNoOfUpdates // or gap in the sequence number
|| (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersion.before(Version.V_7_0_0_alpha1))
// we treat a deleted doc in the tombstone as a valid doc then use updateDocument to overwrite
|| (versionMap.getUnderLock(index.uid().bytes()) != null && versionMap.getUnderLock(index.uid().bytes()).isDelete()) :
"id=" + index.id() + " seq_no=" + index.seqNo() + " max_seq_no_of_updates=" + maxSeqNoOfUpdates + " index_version=" + indexVersion;
return true;
}

@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
Expand Down
Loading

0 comments on commit 6883c3f

Please sign in to comment.