Skip to content

Commit

Permalink
CCR: replicates max seq_no of updates to follower (#34051)
Browse files Browse the repository at this point in the history
This commit replicates the max_seq_no_of_updates on the leading index
to the primaries of the following index via ShardFollowNodeTask. The
max_seq_of_updates is then transmitted to the replicas of the follower
via replication requests (that's BulkShardOperationsRequest).

Relates #33656
  • Loading branch information
dnhatn authored and kcm committed Oct 30, 2018
1 parent b5b0d82 commit 0504cb6
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ public long getMaxSeqNo() {
return maxSeqNo;
}

private long maxSeqNoOfUpdatesOrDeletes;

public long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes;
}

private Translog.Operation[] operations;

public Translog.Operation[] getOperations() {
Expand All @@ -220,11 +226,13 @@ public Translog.Operation[] getOperations() {
final long mappingVersion,
final long globalCheckpoint,
final long maxSeqNo,
final long maxSeqNoOfUpdatesOrDeletes,
final Translog.Operation[] operations) {

this.mappingVersion = mappingVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
this.operations = operations;
}

Expand All @@ -234,6 +242,7 @@ public void readFrom(final StreamInput in) throws IOException {
mappingVersion = in.readVLong();
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

Expand All @@ -243,6 +252,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVLong(mappingVersion);
out.writeZLong(globalCheckpoint);
out.writeZLong(maxSeqNo);
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
out.writeArray(Translog.Operation::writeOperation, operations);
}

Expand All @@ -254,12 +264,13 @@ public boolean equals(final Object o) {
return mappingVersion == that.mappingVersion &&
globalCheckpoint == that.globalCheckpoint &&
maxSeqNo == that.maxSeqNo &&
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
Arrays.equals(operations, that.operations);
}

@Override
public int hashCode() {
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations));
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations));
}
}

Expand Down Expand Up @@ -294,7 +305,9 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
request.getMaxOperationCount(),
request.getExpectedHistoryUUID(),
request.getMaxOperationSizeInBytes());
return getResponse(mappingVersion, seqNoStats, operations);
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations);
}

@Override
Expand Down Expand Up @@ -358,7 +371,8 @@ private void globalCheckpointAdvancementFailure(
final long mappingVersion =
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY));
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY));
} catch (final Exception caught) {
caught.addSuppressed(e);
listener.onFailure(caught);
Expand Down Expand Up @@ -433,8 +447,9 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}

static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) {
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
final long maxSeqNoOfUpdates, final Translog.Operation[] operations) {
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.AllocatedPersistentTask;
Expand Down Expand Up @@ -56,6 +57,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {

private long leaderGlobalCheckpoint;
private long leaderMaxSeqNo;
private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long lastRequestedSeqNo;
private long followerGlobalCheckpoint = 0;
private long followerMaxSeqNo = 0;
Expand Down Expand Up @@ -201,7 +203,7 @@ private synchronized void coordinateWrites() {
numConcurrentWrites++;
LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(),
ops.get(ops.size() - 1).seqNo(), ops.size());
sendBulkShardOperationsRequest(ops);
sendBulkShardOperationsRequest(ops, leaderMaxSeqNoOfUpdatesOrDeletes, new AtomicInteger(0));
}
}

Expand Down Expand Up @@ -262,6 +264,7 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
onOperationsFetched(response.getOperations());
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
leaderMaxSeqNo = Math.max(leaderMaxSeqNo, response.getMaxSeqNo());
leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.max(leaderMaxSeqNoOfUpdatesOrDeletes, response.getMaxSeqNoOfUpdatesOrDeletes());
final long newFromSeqNo;
if (response.getOperations().length == 0) {
newFromSeqNo = from;
Expand Down Expand Up @@ -291,13 +294,11 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
}
}

private void sendBulkShardOperationsRequest(List<Translog.Operation> operations) {
sendBulkShardOperationsRequest(operations, new AtomicInteger(0));
}

private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, AtomicInteger retryCounter) {
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,
AtomicInteger retryCounter) {
assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "mus is not replicated";
final long startTime = relativeTimeProvider.getAsLong();
innerSendBulkShardOperationsRequest(operations,
innerSendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes,
response -> {
synchronized (ShardFollowNodeTask.this) {
totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
Expand All @@ -311,7 +312,8 @@ private void sendBulkShardOperationsRequest(List<Translog.Operation> operations,
totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfFailedBulkOperations++;
}
handleFailure(e, retryCounter, () -> sendBulkShardOperationsRequest(operations, retryCounter));
handleFailure(e, retryCounter,
() -> sendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes, retryCounter));
}
);
}
Expand Down Expand Up @@ -383,8 +385,8 @@ private static boolean shouldRetry(Exception e) {
// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerSendBulkShardOperationsRequest(
List<Translog.Operation> operations, Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);
protected abstract void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,
Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);

protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,11 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
@Override
protected void innerSendBulkShardOperationsRequest(
final List<Translog.Operation> operations,
final long maxSeqNoOfUpdatesOrDeletes,
final Consumer<BulkShardOperationsResponse> handler,
final Consumer<Exception> errorHandler) {
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations);
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(
params.getFollowShardId(), operations, maxSeqNoOfUpdatesOrDeletes);
followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
ActionListener.wrap(response -> handler.accept(response), errorHandler));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,37 @@
public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<BulkShardOperationsRequest> {

private List<Translog.Operation> operations;
private long maxSeqNoOfUpdatesOrDeletes;

public BulkShardOperationsRequest() {
}

public BulkShardOperationsRequest(final ShardId shardId, final List<Translog.Operation> operations) {
public BulkShardOperationsRequest(ShardId shardId, List<Translog.Operation> operations, long maxSeqNoOfUpdatesOrDeletes) {
super(shardId);
setRefreshPolicy(RefreshPolicy.NONE);
this.operations = operations;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
}

public List<Translog.Operation> getOperations() {
return operations;
}

public long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readList(Translog.Operation::readOperation);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
out.writeVInt(operations.size());
for (Translog.Operation operation : operations) {
Translog.Operation.writeOperation(out, operation);
Expand All @@ -50,6 +58,7 @@ public void writeTo(final StreamOutput out) throws IOException {
public String toString() {
return "BulkShardOperationsRequest{" +
"operations=" + operations.size()+
", maxSeqNoUpdates=" + maxSeqNoOfUpdatesOrDeletes +
", shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -60,13 +61,15 @@ public TransportBulkShardOperationsAction(
@Override
protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger);
return shardOperationOnPrimary(
request.shardId(), request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
}

// public for testing purposes only
public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final ShardId shardId,
final List<Translog.Operation> sourceOperations,
final long maxSeqNoOfUpdatesOrDeletes,
final IndexShard primary,
final Logger logger) throws IOException {
final List<Translog.Operation> targetOperations = sourceOperations.stream().map(operation -> {
Expand Down Expand Up @@ -103,16 +106,19 @@ public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperations
}
return operationWithPrimaryTerm;
}).collect(Collectors.toList());
// TODO: Replace this artificial value by the actual max_seq_no_updates from the leader
targetOperations.stream().mapToLong(Translog.Operation::seqNo).max().ifPresent(primary::advanceMaxSeqNoOfUpdatesOrDeletes);
assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]";
primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY);
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations);
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
shardId, targetOperations, maxSeqNoOfUpdatesOrDeletes);
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
}

@Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() :
"mus on replica [" + replica + "] < mus of request [" + request.getMaxSeqNoOfUpdatesOrDeletes() + "]";
final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}
Expand Down
Loading

0 comments on commit 0504cb6

Please sign in to comment.