diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 59f9042ec4a85..0657fab55b220 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -38,6 +38,7 @@ import org.apache.logging.log4j.util.MessageSupplier; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.ActionRunnable; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.DocWriteResponse; @@ -46,25 +47,36 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.action.support.replication.ReplicationMode; +import org.opensearch.action.support.replication.ReplicationOperation; +import org.opensearch.action.support.replication.ReplicationTask; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.action.support.replication.TransportWriteAction; import org.opensearch.action.update.UpdateHelper; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.transport.NoNodeAvailableException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.action.index.MappingUpdatedAction; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.Tuple; import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.xcontent.ToXContent; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; @@ -78,24 +90,29 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.node.NodeClosedException; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskId; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; -import org.opensearch.action.support.replication.ReplicationMode; - /** * Performs shard-level bulk (index, delete or update) operations * @@ -117,6 +134,15 @@ public class TransportShardBulkAction extends TransportWriteAction listener = new ChannelActionListener<>(channel, transportPrimaryTermValidationAction, request); + final ShardId shardId = request.getShardId(); + assert shardId != null : "request shardId must be set"; + IndexShard replica = getIndexShard(shardId); + try { + new PrimaryTermValidationReplicaAction(listener, replica, (ReplicationTask) task, request).run(); + } catch (RuntimeException e) { + listener.onFailure(e); + } + } + + /** + * This action is the primary term validation action which is used for doing primary term validation with replicas. + * This is only applicable for TransportShardBulkAction because all writes (delete/update/single write/bulk) + * ultimately boils down to TransportShardBulkAction and isolated primary could continue to acknowledge if it is not + * aware that the primary has changed. This helps achieve the same. More details in java doc of + * {@link TransportShardBulkAction#transportPrimaryTermValidationAction}. + * + * @opensearch.internal + */ + private static final class PrimaryTermValidationReplicaAction extends AbstractRunnable implements ActionListener { + + private final ActionListener onCompletionListener; + private final IndexShard replica; + private final ReplicationTask task; + private final PrimaryTermValidationRequest request; + + public PrimaryTermValidationReplicaAction( + ActionListener onCompletionListener, + IndexShard replica, + ReplicationTask task, + PrimaryTermValidationRequest request + ) { + this.onCompletionListener = onCompletionListener; + this.replica = replica; + this.task = task; + this.request = request; + } + + @Override + public void onResponse(Releasable releasable) { + setPhase(task, "finished"); + onCompletionListener.onResponse(new ReplicaResponse(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED)); + } + + @Override + public void onFailure(Exception e) { + setPhase(task, "failed"); + onCompletionListener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + setPhase(task, "primary-term-validation"); + final String actualAllocationId = this.replica.routingEntry().allocationId().getId(); + if (actualAllocationId.equals(request.getTargetAllocationID()) == false) { + throw new ShardNotFoundException( + this.replica.shardId(), + "expected allocation id [{}] but found [{}]", + request.getTargetAllocationID(), + actualAllocationId + ); + } + // Check operation primary term against the incoming primary term + // If the request primary term is low, then trigger lister failure + if (request.getPrimaryTerm() < replica.getOperationPrimaryTerm()) { + final String message = String.format( + Locale.ROOT, + "%s operation primary term [%d] is too old (current [%d])", + request.getShardId(), + request.getPrimaryTerm(), + replica.getOperationPrimaryTerm() + ); + onFailure(new IllegalStateException(message)); + } else { + onResponse(null); + } + } + } + + /** + * Primary term validation request sent to a specific allocation id + * + * @opensearch.internal + */ + protected static final class PrimaryTermValidationRequest extends TransportRequest { + + /** + * {@link AllocationId#getId()} of the shard this request is sent to + **/ + private final String targetAllocationID; + private final long primaryTerm; + private final ShardId shardId; + + public PrimaryTermValidationRequest(String targetAllocationID, long primaryTerm, ShardId shardId) { + this.targetAllocationID = Objects.requireNonNull(targetAllocationID); + this.primaryTerm = primaryTerm; + this.shardId = Objects.requireNonNull(shardId); + } + + public PrimaryTermValidationRequest(StreamInput in) throws IOException { + super(in); + targetAllocationID = in.readString(); + primaryTerm = in.readVLong(); + shardId = new ShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(targetAllocationID); + out.writeVLong(primaryTerm); + shardId.writeTo(out); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers); + } + + public String getTargetAllocationID() { + return targetAllocationID; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + public ShardId getShardId() { + return shardId; + } + + @Override + public String getDescription() { + return toString(); + } + + @Override + public String toString() { + return "PrimaryTermValidationRequest [" + + shardId + + "] for targetAllocationID [" + + targetAllocationID + + "] with primaryTerm [" + + primaryTerm + + "]"; + } + } + + @Override + protected ReplicationOperation.Replicas primaryTermValidationReplicasProxy() { + return new PrimaryTermValidationProxy(); + } + + /** + * This {@link org.opensearch.action.support.replication.TransportReplicationAction.ReplicasProxy} implementation is + * used for primary term validation and is only relevant for TransportShardBulkAction replication action. + * + * @opensearch.internal + */ + private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy { + + @Override + public void performOn( + ShardRouting replica, + BulkShardRequest request, + long primaryTerm, + long globalCheckpoint, + long maxSeqNoOfUpdatesOrDeletes, + ActionListener listener + ) { + String nodeId = replica.currentNodeId(); + final DiscoveryNode node = clusterService.state().nodes().get(nodeId); + if (node == null) { + listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]")); + return; + } + final PrimaryTermValidationRequest validationRequest = new PrimaryTermValidationRequest( + replica.allocationId().getId(), + primaryTerm, + replica.shardId() + ); + final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>( + listener, + ReplicaResponse::new + ); + transportService.sendRequest(node, transportPrimaryTermValidationAction, validationRequest, transportOptions, handler); + } } @Override @@ -196,7 +428,7 @@ protected long primaryOperationSize(BulkShardRequest request) { } @Override - protected ReplicationMode getReplicationMode(IndexShard indexShard) { + public ReplicationMode getReplicationMode(IndexShard indexShard) { if (indexShard.isRemoteTranslogEnabled()) { return ReplicationMode.PRIMARY_TERM_VALIDATION; } diff --git a/server/src/main/java/org/opensearch/action/support/replication/FanoutReplicationProxy.java b/server/src/main/java/org/opensearch/action/support/replication/FanoutReplicationProxy.java index 2980df4c1c0af..51b95468d6b25 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/FanoutReplicationProxy.java +++ b/server/src/main/java/org/opensearch/action/support/replication/FanoutReplicationProxy.java @@ -8,15 +8,35 @@ package org.opensearch.action.support.replication; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse; +import org.opensearch.action.support.replication.ReplicationOperation.Replicas; import org.opensearch.cluster.routing.ShardRouting; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + /** * This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing if * it is not the primary and has replication mode as {@link ReplicationMode#FULL_REPLICATION}. * * @opensearch.internal */ -public class FanoutReplicationProxy extends ReplicationProxy { +public class FanoutReplicationProxy> extends ReplicationProxy { + + public FanoutReplicationProxy(Replicas replicasProxy) { + super(replicasProxy); + } + + @Override + protected void performOnReplicaProxy( + ReplicationProxyRequest proxyRequest, + ReplicationMode replicationMode, + BiConsumer>, ReplicationProxyRequest> performOnReplicaConsumer + ) { + assert replicationMode == ReplicationMode.FULL_REPLICATION : "FanoutReplicationProxy allows only full replication mode"; + performOnReplicaConsumer.accept(getReplicasProxyConsumer(fullReplicationProxy, proxyRequest), proxyRequest); + } @Override ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) { diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java index fa28e99d5696f..26d3b3c2f64ef 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationModeAwareProxy.java @@ -8,9 +8,13 @@ package org.opensearch.action.support.replication; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse; import org.opensearch.cluster.routing.ShardRouting; import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing basis @@ -18,13 +22,40 @@ * * @opensearch.internal */ -public class ReplicationModeAwareProxy extends ReplicationProxy { +public class ReplicationModeAwareProxy> extends ReplicationProxy { private final ReplicationMode replicationModeOverride; - public ReplicationModeAwareProxy(ReplicationMode replicationModeOverride) { - assert Objects.nonNull(replicationModeOverride); - this.replicationModeOverride = replicationModeOverride; + /** + * This ReplicasProxy is used for performing primary term validation. + */ + private final ReplicationOperation.Replicas primaryTermValidationProxy; + + public ReplicationModeAwareProxy( + ReplicationMode replicationModeOverride, + ReplicationOperation.Replicas replicasProxy, + ReplicationOperation.Replicas primaryTermValidationProxy + ) { + super(replicasProxy); + this.replicationModeOverride = Objects.requireNonNull(replicationModeOverride); + this.primaryTermValidationProxy = Objects.requireNonNull(primaryTermValidationProxy); + } + + @Override + protected void performOnReplicaProxy( + ReplicationProxyRequest proxyRequest, + ReplicationMode replicationMode, + BiConsumer>, ReplicationProxyRequest> performOnReplicaConsumer + ) { + assert replicationMode == ReplicationMode.FULL_REPLICATION || replicationMode == ReplicationMode.PRIMARY_TERM_VALIDATION; + + Consumer> replicasProxyConsumer; + if (replicationMode == ReplicationMode.FULL_REPLICATION) { + replicasProxyConsumer = getReplicasProxyConsumer(fullReplicationProxy, proxyRequest); + } else { + replicasProxyConsumer = getReplicasProxyConsumer(primaryTermValidationProxy, proxyRequest); + } + performOnReplicaConsumer.accept(replicasProxyConsumer, proxyRequest); } @Override diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java index 1a6a5a9245eb2..944729df2ab1e 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationOperation.java @@ -66,6 +66,7 @@ import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.LongSupplier; /** @@ -237,17 +238,19 @@ private void performOnReplicas( globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions, - replicaRequest + replicaRequest, + primaryTerm ).build(); replicationProxy.performOnReplicaProxy(proxyRequest, this::performOnReplica); } } - private void performOnReplica(final ReplicationProxyRequest replicationProxyRequest) { + private void performOnReplica( + final Consumer> replicasProxyConsumer, + final ReplicationProxyRequest replicationProxyRequest + ) { final ShardRouting shard = replicationProxyRequest.getShardRouting(); final ReplicaRequest replicaRequest = replicationProxyRequest.getReplicaRequest(); - final long globalCheckpoint = replicationProxyRequest.getGlobalCheckpoint(); - final long maxSeqNoOfUpdatesOrDeletes = replicationProxyRequest.getMaxSeqNoOfUpdatesOrDeletes(); final PendingReplicationActions pendingReplicationActions = replicationProxyRequest.getPendingReplicationActions(); if (logger.isTraceEnabled()) { @@ -319,7 +322,7 @@ public String toString() { @Override public void tryAction(ActionListener listener) { - replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener); + replicasProxyConsumer.accept(listener); } @Override diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxy.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxy.java index e098ea1aed960..20f7b5fc6a586 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxy.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxy.java @@ -8,8 +8,12 @@ package org.opensearch.action.support.replication; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse; +import org.opensearch.action.support.replication.ReplicationOperation.Replicas; import org.opensearch.cluster.routing.ShardRouting; +import java.util.function.BiConsumer; import java.util.function.Consumer; /** @@ -18,27 +22,51 @@ * * @opensearch.internal */ -public abstract class ReplicationProxy { +public abstract class ReplicationProxy> { + + /** + * This is the replicas proxy which is used for full replication. + */ + protected final Replicas fullReplicationProxy; + + public ReplicationProxy(Replicas fullReplicationProxy) { + this.fullReplicationProxy = fullReplicationProxy; + } /** * Depending on the actual implementation and the passed {@link ReplicationMode}, the replication * mode is determined using which the replication request is performed on the replica or not. * * @param proxyRequest replication proxy request - * @param originalPerformOnReplicaConsumer original performOnReplica method passed as consumer + * @param performOnReplicaConsumer performOnReplicasProxy */ - public void performOnReplicaProxy( + final void performOnReplicaProxy( ReplicationProxyRequest proxyRequest, - Consumer> originalPerformOnReplicaConsumer + BiConsumer>, ReplicationProxyRequest> performOnReplicaConsumer ) { ReplicationMode replicationMode = determineReplicationMode(proxyRequest.getShardRouting(), proxyRequest.getPrimaryRouting()); // If the replication modes are 1. Logical replication or 2. Primary term validation, we let the call get performed on the // replica shard. - if (replicationMode == ReplicationMode.FULL_REPLICATION || replicationMode == ReplicationMode.PRIMARY_TERM_VALIDATION) { - originalPerformOnReplicaConsumer.accept(proxyRequest); + if (replicationMode == ReplicationMode.NO_REPLICATION) { + return; } + performOnReplicaProxy(proxyRequest, replicationMode, performOnReplicaConsumer); } + /** + * The implementor can decide the {@code Consumer>} basis the + * proxyRequest and replicationMode. This will ultimately make the calls to replica. + * + * @param proxyRequest replication proxy request + * @param replicationMode replication mode + * @param performOnReplicaConsumer performOnReplicasProxy + */ + protected abstract void performOnReplicaProxy( + ReplicationProxyRequest proxyRequest, + ReplicationMode replicationMode, + BiConsumer>, ReplicationProxyRequest> performOnReplicaConsumer + ); + /** * Determines what is the replication mode basis the constructor arguments of the implementation and the current * replication mode aware shard routing. @@ -48,4 +76,18 @@ public void performOnReplicaProxy( * @return the determined replication mode. */ abstract ReplicationMode determineReplicationMode(final ShardRouting shardRouting, final ShardRouting primaryRouting); + + protected Consumer> getReplicasProxyConsumer( + Replicas proxy, + ReplicationProxyRequest proxyRequest + ) { + return (listener) -> proxy.performOn( + proxyRequest.getShardRouting(), + proxyRequest.getReplicaRequest(), + proxyRequest.getPrimaryTerm(), + proxyRequest.getGlobalCheckpoint(), + proxyRequest.getMaxSeqNoOfUpdatesOrDeletes(), + listener + ); + } } diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxyFactory.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxyFactory.java deleted file mode 100644 index a2bbf58fb9100..0000000000000 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxyFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.support.replication; - -import org.opensearch.index.shard.IndexShard; - -/** - * Factory that returns the {@link ReplicationProxy} instance basis the {@link ReplicationMode}. - * - * @opensearch.internal - */ -public class ReplicationProxyFactory { - - public static ReplicationProxy create( - final IndexShard indexShard, - final ReplicationMode replicationModeOverride - ) { - if (indexShard.isRemoteTranslogEnabled()) { - return new ReplicationModeAwareProxy<>(replicationModeOverride); - } - return new FanoutReplicationProxy<>(); - } -} diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxyRequest.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxyRequest.java index 180efd6f423c3..c65e55867f706 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxyRequest.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationProxyRequest.java @@ -31,13 +31,16 @@ public class ReplicationProxyRequest { private final ReplicaRequest replicaRequest; + private final long primaryTerm; + private ReplicationProxyRequest( ShardRouting shardRouting, ShardRouting primaryRouting, long globalCheckpoint, long maxSeqNoOfUpdatesOrDeletes, PendingReplicationActions pendingReplicationActions, - ReplicaRequest replicaRequest + ReplicaRequest replicaRequest, + long primaryTerm ) { this.shardRouting = Objects.requireNonNull(shardRouting); this.primaryRouting = Objects.requireNonNull(primaryRouting); @@ -45,6 +48,7 @@ private ReplicationProxyRequest( this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; this.pendingReplicationActions = Objects.requireNonNull(pendingReplicationActions); this.replicaRequest = Objects.requireNonNull(replicaRequest); + this.primaryTerm = primaryTerm; } public ShardRouting getShardRouting() { @@ -71,6 +75,10 @@ public ReplicaRequest getReplicaRequest() { return replicaRequest; } + public long getPrimaryTerm() { + return primaryTerm; + } + /** * Builder of ReplicationProxyRequest. * @@ -84,6 +92,7 @@ public static class Builder { private final long maxSeqNoOfUpdatesOrDeletes; private final PendingReplicationActions pendingReplicationActions; private final ReplicaRequest replicaRequest; + private final long primaryTerm; public Builder( ShardRouting shardRouting, @@ -91,7 +100,8 @@ public Builder( long globalCheckpoint, long maxSeqNoOfUpdatesOrDeletes, PendingReplicationActions pendingReplicationActions, - ReplicaRequest replicaRequest + ReplicaRequest replicaRequest, + long primaryTerm ) { this.shardRouting = shardRouting; this.primaryRouting = primaryRouting; @@ -99,6 +109,7 @@ public Builder( this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; this.pendingReplicationActions = pendingReplicationActions; this.replicaRequest = replicaRequest; + this.primaryTerm = primaryTerm; } public ReplicationProxyRequest build() { @@ -108,7 +119,8 @@ public ReplicationProxyRequest build() { globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions, - replicaRequest + replicaRequest, + primaryTerm ); } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 0a0904a1b3aaa..e804aa31adb4e 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -46,6 +46,7 @@ import org.opensearch.action.support.ChannelActionListener; import org.opensearch.action.support.TransportAction; import org.opensearch.action.support.TransportActions; +import org.opensearch.action.support.replication.ReplicationOperation.Replicas; import org.opensearch.client.transport.NoNodeAvailableException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; @@ -254,17 +255,40 @@ private void runReroutePhase(Task task, Request request, ActionListener newReplicasProxy() { + protected Replicas newReplicasProxy() { return new ReplicasProxy(); } + /** + * This returns a ReplicaProxy that is used for primary term validation. The default behavior is that the control + * must not reach inside the performOn method for ReplicationActions. However, the implementations of the underlying + * class can provide primary term validation proxy that can allow performOn method to make calls to replica. + * + * @return Primary term validation replicas proxy. + */ + protected Replicas primaryTermValidationReplicasProxy() { + return new ReplicasProxy() { + @Override + public void performOn( + ShardRouting replica, + ReplicaRequest request, + long primaryTerm, + long globalCheckpoint, + long maxSeqNoOfUpdatesOrDeletes, + ActionListener listener + ) { + throw new UnsupportedOperationException("Primary term validation is not available for " + actionName); + } + }; + } + /** * This method is used for defining the {@link ReplicationMode} override per {@link TransportReplicationAction}. * * @param indexShard index shard used to determining the policy. * @return the overridden replication mode. */ - protected ReplicationMode getReplicationMode(IndexShard indexShard) { + public ReplicationMode getReplicationMode(IndexShard indexShard) { if (indexShard.isRemoteTranslogEnabled()) { return ReplicationMode.NO_REPLICATION; } @@ -536,21 +560,24 @@ public void handleException(TransportException exp) { onCompletionListener.onResponse(response); }, e -> handleException(primaryShardReference, e)); + final Replicas replicasProxy = newReplicasProxy(); + final IndexShard indexShard = primaryShardReference.indexShard; + final Replicas termValidationProxy = primaryTermValidationReplicasProxy(); + new ReplicationOperation<>( primaryRequest.getRequest(), primaryShardReference, ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful), - newReplicasProxy(), + replicasProxy, logger, threadPool, actionName, primaryRequest.getPrimaryTerm(), initialRetryBackoffBound, retryTimeout, - ReplicationProxyFactory.create( - primaryShardReference.indexShard, - getReplicationMode(primaryShardReference.indexShard) - ) + indexShard.isRemoteTranslogEnabled() + ? new ReplicationModeAwareProxy<>(getReplicationMode(indexShard), replicasProxy, termValidationProxy) + : new FanoutReplicationProxy<>(replicasProxy) ).execute(); } } catch (Exception e) { @@ -830,7 +857,7 @@ protected void doRun() throws Exception { } } - private IndexShard getIndexShard(final ShardId shardId) { + protected IndexShard getIndexShard(final ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); } @@ -1283,7 +1310,7 @@ public static class ReplicaResponse extends ActionResponse implements Replicatio private long localCheckpoint; private long globalCheckpoint; - ReplicaResponse(StreamInput in) throws IOException { + public ReplicaResponse(StreamInput in) throws IOException { super(in); localCheckpoint = in.readZLong(); globalCheckpoint = in.readZLong(); @@ -1338,7 +1365,7 @@ public int hashCode() { * * @opensearch.internal */ - protected class ReplicasProxy implements ReplicationOperation.Replicas { + protected class ReplicasProxy implements Replicas { @Override public void performOn( @@ -1401,7 +1428,9 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, l */ public static class ConcreteShardRequest extends TransportRequest { - /** {@link AllocationId#getId()} of the shard this request is sent to **/ + /** + * {@link AllocationId#getId()} of the shard this request is sent to + **/ private final String targetAllocationID; private final long primaryTerm; private final R request; @@ -1568,7 +1597,7 @@ public String toString() { * Sets the current phase on the task if it isn't null. Pulled into its own * method because its more convenient that way. */ - static void setPhase(ReplicationTask task, String phase) { + protected static void setPhase(ReplicationTask task, String phase) { if (task != null) { task.setPhase(phase); } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index 7fc810808f560..26b15195cd8fc 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -502,7 +502,7 @@ void run() { * * @opensearch.internal */ - class WriteActionReplicasProxy extends ReplicasProxy { + protected class WriteActionReplicasProxy extends ReplicasProxy { @Override public void failShardIfNeeded( diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index d2fc354cf9298..e7b53874c9d1b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -96,7 +96,7 @@ protected void doExecute(Task task, PublishCheckpointRequest request, ActionList } @Override - protected ReplicationMode getReplicationMode(IndexShard indexShard) { + public ReplicationMode getReplicationMode(IndexShard indexShard) { if (indexShard.isRemoteTranslogEnabled()) { return ReplicationMode.FULL_REPLICATION; } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index a7ffde04314c3..000dac92506f6 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -43,6 +43,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.FanoutReplicationProxy; import org.opensearch.action.support.replication.PendingReplicationActions; +import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.TransportReplicationAction; @@ -292,7 +293,7 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { primaryTerm, TimeValue.timeValueMillis(20), TimeValue.timeValueSeconds(60), - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(proxy) ); operation.execute(); @@ -325,6 +326,32 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { assertThat(shardInfo.getSuccessful(), equalTo(1 + nbReplicas - unavailableShards.size())); } + public void testGetReplicationModeWithRemoteTranslog() { + TransportVerifyShardBeforeCloseAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + TransportVerifyShardBeforeCloseAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + private TransportVerifyShardBeforeCloseAction createAction() { + return new TransportVerifyShardBeforeCloseAction( + Settings.EMPTY, + mock(TransportService.class), + clusterService, + mock(IndicesService.class), + mock(ThreadPool.class), + mock(ShardStateAction.class), + mock(ActionFilters.class) + ); + } + private static ReplicationOperation.Primary< TransportVerifyShardBeforeCloseAction.ShardRequest, diff --git a/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java new file mode 100644 index 0000000000000..09215088bd04b --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/flush/TransportShardFlushActionTests.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.flush; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ReplicationMode; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportShardFlushActionTests extends OpenSearchTestCase { + + public void testGetReplicationModeWithRemoteTranslog() { + TransportShardFlushAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + TransportShardFlushAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + private TransportShardFlushAction createAction() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return new TransportShardFlushAction( + Settings.EMPTY, + mock(TransportService.class), + clusterService, + mock(IndicesService.class), + mock(ThreadPool.class), + mock(ShardStateAction.class), + mock(ActionFilters.class) + ); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java new file mode 100644 index 0000000000000..8c4a6c023f9a5 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockActionTests.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.readonly; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ReplicationMode; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportVerifyShardIndexBlockActionTests extends OpenSearchTestCase { + + public void testGetReplicationModeWithRemoteTranslog() { + TransportVerifyShardIndexBlockAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + TransportVerifyShardIndexBlockAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + private TransportVerifyShardIndexBlockAction createAction() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return new TransportVerifyShardIndexBlockAction( + Settings.EMPTY, + mock(TransportService.class), + clusterService, + mock(IndicesService.class), + mock(ThreadPool.class), + mock(ShardStateAction.class), + mock(ActionFilters.class) + ); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java new file mode 100644 index 0000000000000..b2eee904bad38 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/refresh/TransportShardRefreshActionTests.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.refresh; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ReplicationMode; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportShardRefreshActionTests extends OpenSearchTestCase { + + public void testGetReplicationModeWithRemoteTranslog() { + TransportShardRefreshAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + TransportShardRefreshAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + private TransportShardRefreshAction createAction() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return new TransportShardRefreshAction( + Settings.EMPTY, + mock(TransportService.class), + clusterService, + mock(IndicesService.class), + mock(ThreadPool.class), + mock(ShardStateAction.class), + mock(ActionFilters.class) + ); + } +} diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index b1fa20307a12b..2aff8f6bfc6ab 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -43,18 +43,32 @@ import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.WriteRequest.RefreshPolicy; +import org.opensearch.action.support.replication.ReplicationMode; +import org.opensearch.action.support.replication.ReplicationTask; +import org.opensearch.action.support.replication.TransportReplicationAction.ReplicaResponse; import org.opensearch.action.support.replication.TransportWriteAction.WritePrimaryResult; import org.opensearch.action.update.UpdateHelper; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; +import org.opensearch.cluster.action.index.MappingUpdatedAction; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.Index; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.VersionType; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.VersionConflictEngineException; @@ -62,14 +76,22 @@ import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.MetadataFieldMapper; import org.opensearch.index.mapper.RootObjectMapper; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.SystemIndices; import org.opensearch.rest.RestStatus; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; +import org.opensearch.transport.TestTransportChannel; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.Collections; @@ -85,6 +107,7 @@ import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyLong; @@ -1030,6 +1053,161 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { } } + public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { + + final String aId = "test-allocation-id"; + final ShardId shardId = new ShardId("test", "_na_", 0); + final ReplicationTask task = createReplicationTask(); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportShardBulkAction action = new TransportShardBulkAction( + Settings.EMPTY, + mock(TransportService.class), + mockClusterService(), + mockIndicesService(aId, 1L), + threadPool, + mock(ShardStateAction.class), + mock(MappingUpdatedAction.class), + mock(UpdateHelper.class), + mock(ActionFilters.class), + mock(IndexingPressureService.class), + mock(SystemIndices.class) + ); + action.handlePrimaryTermValidationRequest( + new TransportShardBulkAction.PrimaryTermValidationRequest(aId + "-1", 1, shardId), + createTransportChannel(listener), + task + ); + assertThrows(ShardNotFoundException.class, listener::actionGet); + assertNotNull(task.getPhase()); + assertEquals("failed", task.getPhase()); + } + + public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { + + final String aId = "test-allocation-id"; + final ShardId shardId = new ShardId("test", "_na_", 0); + final ReplicationTask task = createReplicationTask(); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportShardBulkAction action = new TransportShardBulkAction( + Settings.EMPTY, + mock(TransportService.class), + mockClusterService(), + mockIndicesService(aId, 2L), + threadPool, + mock(ShardStateAction.class), + mock(MappingUpdatedAction.class), + mock(UpdateHelper.class), + mock(ActionFilters.class), + mock(IndexingPressureService.class), + mock(SystemIndices.class) + ); + action.handlePrimaryTermValidationRequest( + new TransportShardBulkAction.PrimaryTermValidationRequest(aId, 1, shardId), + createTransportChannel(listener), + task + ); + assertThrows(IllegalStateException.class, listener::actionGet); + assertNotNull(task.getPhase()); + assertEquals("failed", task.getPhase()); + } + + public void testHandlePrimaryTermValidationRequestSuccess() { + + final String aId = "test-allocation-id"; + final ShardId shardId = new ShardId("test", "_na_", 0); + final ReplicationTask task = createReplicationTask(); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportShardBulkAction action = new TransportShardBulkAction( + Settings.EMPTY, + mock(TransportService.class), + mockClusterService(), + mockIndicesService(aId, 1L), + threadPool, + mock(ShardStateAction.class), + mock(MappingUpdatedAction.class), + mock(UpdateHelper.class), + mock(ActionFilters.class), + mock(IndexingPressureService.class), + mock(SystemIndices.class) + ); + action.handlePrimaryTermValidationRequest( + new TransportShardBulkAction.PrimaryTermValidationRequest(aId, 1, shardId), + createTransportChannel(listener), + task + ); + assertTrue(listener.actionGet() instanceof ReplicaResponse); + assertEquals(SequenceNumbers.NO_OPS_PERFORMED, ((ReplicaResponse) listener.actionGet()).localCheckpoint()); + assertEquals(SequenceNumbers.NO_OPS_PERFORMED, ((ReplicaResponse) listener.actionGet()).globalCheckpoint()); + assertNotNull(task.getPhase()); + assertEquals("finished", task.getPhase()); + } + + public void testGetReplicationModeWithRemoteTranslog() { + TransportShardBulkAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.PRIMARY_TERM_VALIDATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + TransportShardBulkAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + private TransportShardBulkAction createAction() { + return new TransportShardBulkAction( + Settings.EMPTY, + mock(TransportService.class), + mockClusterService(), + mock(IndicesService.class), + threadPool, + mock(ShardStateAction.class), + mock(MappingUpdatedAction.class), + mock(UpdateHelper.class), + mock(ActionFilters.class), + mock(IndexingPressureService.class), + mock(SystemIndices.class) + ); + } + + private ClusterService mockClusterService() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return clusterService; + } + + private IndicesService mockIndicesService(String aId, long primaryTerm) { + // Mock few of the required classes + IndicesService indicesService = mock(IndicesService.class); + IndexService indexService = mock(IndexService.class); + IndexShard indexShard = mock(IndexShard.class); + when(indicesService.indexServiceSafe(any(Index.class))).thenReturn(indexService); + when(indexService.getShard(anyInt())).thenReturn(indexShard); + when(indexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm); + + // Mock routing entry, allocation id + AllocationId allocationId = mock(AllocationId.class); + ShardRouting shardRouting = mock(ShardRouting.class); + when(indexShard.routingEntry()).thenReturn(shardRouting); + when(shardRouting.allocationId()).thenReturn(allocationId); + when(allocationId.getId()).thenReturn(aId); + return indicesService; + } + + private ReplicationTask createReplicationTask() { + return new ReplicationTask(0, null, null, null, null, null); + } + + /** + * Transport channel that is needed for replica operation testing. + */ + private TransportChannel createTransportChannel(final PlainActionFuture listener) { + return new TestTransportChannel(listener); + } + private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { if (randomBoolean()) { // add a response to the request and thereby check that it is ignored for the primary. diff --git a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java index acf46e2a63333..d6d944b5b9b45 100644 --- a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java @@ -38,6 +38,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.PendingReplicationActions; +import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.block.ClusterBlocks; @@ -50,6 +51,7 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.lease.Releasable; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.index.Index; @@ -68,6 +70,7 @@ import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; import org.opensearch.transport.nio.MockNioTransport; import java.nio.charset.Charset; @@ -222,4 +225,35 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { } } } + + public void testGetReplicationModeWithRemoteTranslog() { + final TransportResyncReplicationAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + final TransportResyncReplicationAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + private TransportResyncReplicationAction createAction() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return new TransportResyncReplicationAction( + Settings.EMPTY, + mock(TransportService.class), + clusterService, + mock(IndicesService.class), + threadPool, + mock(ShardStateAction.class), + new ActionFilters(new HashSet<>()), + mock(IndexingPressureService.class), + new SystemIndices(emptyMap()) + ); + } } diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java index 3a689e356bbdf..c5d4f3326746d 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java @@ -168,7 +168,7 @@ public void testReplication() throws Exception { listener, replicasProxy, primaryTerm, - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(replicasProxy) ); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -239,7 +239,7 @@ public void testReplicationWithRemoteTranslogEnabled() throws Exception { listener, replicasProxy, 0, - new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION) + new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, replicasProxy, replicasProxy) ); op.execute(); assertTrue("request was not processed on primary", request.processedOnPrimary.get()); @@ -304,7 +304,7 @@ public void testPrimaryToPrimaryReplicationWithRemoteTranslogEnabled() throws Ex listener, replicasProxy, 0, - new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION) + new ReplicationModeAwareProxy<>(ReplicationMode.NO_REPLICATION, replicasProxy, replicasProxy) ); op.execute(); assertTrue("request was not processed on primary", request.processedOnPrimary.get()); @@ -366,7 +366,7 @@ public void testForceReplicationWithRemoteTranslogEnabled() throws Exception { listener, replicasProxy, 0, - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(replicasProxy) ); op.execute(); assertTrue("request was not processed on primary", request.processedOnPrimary.get()); @@ -448,7 +448,7 @@ public void testRetryTransientReplicationFailure() throws Exception { primaryTerm, TimeValue.timeValueMillis(20), TimeValue.timeValueSeconds(60), - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(replicasProxy) ); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -591,7 +591,7 @@ public void failShard(String message, Exception exception) { listener, replicasProxy, primaryTerm, - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(replicasProxy) ); op.execute(); @@ -657,7 +657,7 @@ public void perform(Request request, ActionListener listener) { listener, new TestReplicaProxy(), primaryTerm, - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(new TestReplicaProxy()) ); op.execute(); @@ -714,7 +714,7 @@ public void testWaitForActiveShards() throws Exception { threadPool, "test", primaryTerm, - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(new TestReplicaProxy()) ); if (passesActiveShardCheck) { @@ -781,7 +781,7 @@ public void updateLocalCheckpointForShard(String allocationId, long checkpoint) listener, replicas, primaryTerm, - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(replicas) ); operation.execute(); diff --git a/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java index 1e20c2e948f6e..75063d76ff8dc 100644 --- a/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -34,6 +34,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; +import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; @@ -152,4 +153,31 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { } } + public void testGetReplicationModeWithRemoteTranslog() { + final GlobalCheckpointSyncAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + final GlobalCheckpointSyncAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + private GlobalCheckpointSyncAction createAction() { + final IndicesService indicesService = mock(IndicesService.class); + return new GlobalCheckpointSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()) + ); + } + } diff --git a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 54a88d57b2b69..2e058b6dab560 100644 --- a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -37,6 +37,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; @@ -209,4 +210,30 @@ public void testBlocks() { assertNull(action.indexBlockLevel()); } + public void testGetReplicationModeWithRemoteTranslog() { + final RetentionLeaseBackgroundSyncAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + final RetentionLeaseBackgroundSyncAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + private RetentionLeaseBackgroundSyncAction createAction() { + return new RetentionLeaseBackgroundSyncAction( + Settings.EMPTY, + transportService, + clusterService, + mock(IndicesService.class), + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()) + ); + } + } diff --git a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java index 60ee3360ff235..b07b740fe3744 100644 --- a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -36,6 +36,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; @@ -206,4 +207,32 @@ public void testBlocks() { assertNull(action.indexBlockLevel()); } + public void testGetReplicationModeWithRemoteTranslog() { + final RetentionLeaseSyncAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + final RetentionLeaseSyncAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.NO_REPLICATION, action.getReplicationMode(indexShard)); + } + + private RetentionLeaseSyncAction createAction() { + return new RetentionLeaseSyncAction( + Settings.EMPTY, + transportService, + clusterService, + mock(IndicesService.class), + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexingPressureService(Settings.EMPTY, clusterService), + new SystemIndices(emptyMap()) + ); + } + } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index f1f55c67bfca6..88b9512eedaba 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -12,6 +12,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.service.ClusterService; @@ -158,4 +159,31 @@ public void testPublishCheckpointActionOnReplica() { } + public void testGetReplicationModeWithRemoteTranslog() { + final PublishCheckpointAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(true); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + public void testGetReplicationModeWithLocalTranslog() { + final PublishCheckpointAction action = createAction(); + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.isRemoteTranslogEnabled()).thenReturn(false); + assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); + } + + private PublishCheckpointAction createAction() { + return new PublishCheckpointAction( + Settings.EMPTY, + transportService, + clusterService, + mock(IndicesService.class), + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + mock(SegmentReplicationTargetService.class) + ); + } + } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 92c80ac1799ef..f4babda725057 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -729,7 +729,7 @@ public void execute() { primaryTerm, TimeValue.timeValueMillis(20), TimeValue.timeValueSeconds(60), - new FanoutReplicationProxy<>() + new FanoutReplicationProxy<>(new ReplicasRef()) ).execute(); } catch (Exception e) { listener.onFailure(e);