From 9b83b522550851c584f4888fa6ab82773904fb64 Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Wed, 22 Jun 2022 23:29:38 +0000 Subject: [PATCH] Resolving conflicts and Merging PR https://github.com/opensearch-project/OpenSearch/pull/3540. Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 2 +- .../opensearch/index/shard/IndexShard.java | 57 +++++++++++-- .../org/opensearch/indices/IndicesModule.java | 2 + .../SegmentReplicationTargetService.java | 35 +++++++- .../checkpoint/PublishCheckpointAction.java | 9 ++- .../checkpoint/ReplicationCheckpoint.java | 2 +- ...SegmentReplicationCheckpointPublisher.java | 1 + .../common/ReplicationCollection.java | 10 +++ .../main/java/org/opensearch/node/Node.java | 11 +++ .../SegmentReplicationTargetServiceTests.java | 80 +++++++++++++++++-- .../PublishCheckpointActionTests.java | 17 ++-- 12 files changed, 200 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 1dd31b37efc12..43dc51ab824fc 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -84,7 +84,6 @@ import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.translog.TranslogDeletionPolicy; -import org.opensearch.index.translog.TranslogStats; import org.opensearch.search.suggest.completion.CompletionStats; import java.io.Closeable; @@ -181,6 +180,7 @@ protected SegmentInfos getLatestSegmentInfos() { return null; }; + /** * In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos} * object directly, this method returns a {@link GatedCloseable} reference to the same object. diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 2ba06c0f16927..a8b00c9ed8504 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2285,7 +2285,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = internalReaderManager.acquire(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e1b3e6199d2f3..3aae7faf2ee5b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -162,7 +162,7 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; @@ -1385,15 +1385,60 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti * Returns the lastest Replication Checkpoint that shard received */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { - return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return Optional.ofNullable(snapshot.get()) + .map( + segmentInfos -> new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + segmentInfos.getVersion() + ) + ) + .orElse( + new ReplicationCheckpoint( + shardId, + getOperationPrimaryTerm(), + SequenceNumbers.NO_OPS_PERFORMED, + getProcessedLocalCheckpoint(), + SequenceNumbers.NO_OPS_PERFORMED + ) + ); + } catch (IOException ex) { + throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); + } } /** - * Invoked when a new checkpoint is received from a primary shard. Starts the copy process. + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed */ - public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { - assert shardRouting.primary() == false; - // TODO + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (state().equals(IndexShardState.STARTED) == false) { + logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); + return false; + } + ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", + localCheckpoint, + requestCheckpoint + ) + ); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.trace( + () -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint) + ); + return false; + } + return true; } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index c728952a9dcbf..e413a73940011 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -284,6 +284,8 @@ protected void configure() { bind(RetentionLeaseSyncer.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); + } else { + bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1c6053a72a4c5..c44b27911bb7a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -38,7 +38,7 @@ * * @opensearch.internal */ -public final class SegmentReplicationTargetService implements IndexEventListener { +public class SegmentReplicationTargetService implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); @@ -84,6 +84,39 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + /** + * Invoked when a new checkpoint is received from a primary shard. + * It checks if a new checkpoint should be processed or not and starts replication if needed. + * @param receivedCheckpoint received checkpoint that is checked for processing + * @param replicaShard replica shard on which checkpoint is received + */ + public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", + replicaShard.getLatestReplicationCheckpoint() + ) + ); + return; + } + if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { + startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) {} + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + if (sendShardFailure == true) { + logger.error("replication failure", e); + replicaShard.failShard("replication failure", e); + } + } + }); + + } + } + public void startReplication( final ReplicationCheckpoint checkpoint, final IndexShard indexShard, diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index b74a69971ebd5..8093b6aee88f9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -28,6 +28,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -52,6 +53,8 @@ public class PublishCheckpointAction extends TransportReplicationAction< public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); + private final SegmentReplicationTargetService replicationService; + @Inject public PublishCheckpointAction( Settings settings, @@ -60,7 +63,8 @@ public PublishCheckpointAction( IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters + ActionFilters actionFilters, + SegmentReplicationTargetService targetService ) { super( settings, @@ -75,6 +79,7 @@ public PublishCheckpointAction( PublishCheckpointRequest::new, ThreadPool.Names.REFRESH ); + this.replicationService = targetService; } @Override @@ -165,7 +170,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh ActionListener.completeWith(listener, () -> { logger.trace("Checkpoint received on replica {}", request); if (request.getCheckpoint().getShardId().equals(replica.shardId())) { - replica.onNewCheckpoint(request); + replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } return new ReplicaResult(); }); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 98ab9cc4c1708..f84a65206190b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -115,7 +115,7 @@ public int hashCode() { * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || segmentInfosVersion > other.getSegmentInfosVersion(); + return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index 2b09901a947fe..6be524cea140e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -22,6 +22,7 @@ public class SegmentReplicationCheckpointPublisher { private final PublishAction publishAction; + // This Component is behind feature flag so we are manually binding this in IndicesModule. @Inject public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { this(publishAction::publish); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index b8295f0685a7f..d648ca6041ff8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) { return cancelled; } + /** + * check if a shard is currently replicating + * + * @param shardId shardId for which to check if replicating + * @return true if shard is currently replicating + */ + public boolean isShardReplicating(ShardId shardId) { + return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId)); + } + /** * a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference * causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index cf99d116131a3..346bff9afe296 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -41,6 +41,8 @@ import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; @@ -948,6 +950,15 @@ protected Node( b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { + b.bind(SegmentReplicationTargetService.class) + .toInstance( + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ) + ); b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 9a1146fe41ee2..1fba286b2cc63 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.junit.Assert; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -18,6 +19,7 @@ import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.transport.TransportService; import java.io.IOException; @@ -27,6 +29,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -42,7 +47,7 @@ public void setUp() throws Exception { final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); final TransportService transportService = mock(TransportService.class); - indexShard = newShard(false, settings); + indexShard = newStartedShard(false, settings); checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); @@ -57,7 +62,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testTargetReturnsSuccess_listenerCompletes() throws IOException { + public void testTargetReturnsSuccess_listenerCompletes() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -83,10 +88,9 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept return null; }).when(spy).startReplication(any()); sut.startReplication(spy); - closeShards(indexShard); } - public void testTargetThrowsException() throws IOException { + public void testTargetThrowsException() { final OpenSearchException expectedError = new OpenSearchException("Fail"); final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, @@ -115,10 +119,71 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept return null; }).when(spy).startReplication(any()); sut.startReplication(spy); - closeShards(indexShard); } - public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { + public void testAlreadyOnNewCheckpoint() { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + } + + public void testShardAlreadyReplicating() { + SegmentReplicationTargetService spy = spy(sut); + // Create a separate target and start it so the shard is already replicating. + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ); + final SegmentReplicationTarget spyTarget = Mockito.spy(target); + spy.startReplication(spyTarget); + + // a new checkpoint comes in for the same IndexShard. + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + spyTarget.markAsDone(); + } + + public void testNewCheckpointBehindCurrentCheckpoint() { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + } + + public void testShardNotStarted() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + IndexShard shard = newShard(false); + spy.onNewCheckpoint(checkpoint, shard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(shard); + } + + public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOException { + allowShardFailures(); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(indexShard); + ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint(); + ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 1 + ); + ArgumentCaptor captor = ArgumentCaptor.forClass( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(newCheckpoint, spyShard); + verify(spy, times(1)).startReplication(any(), any(), captor.capture()); + SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); + listener.onFailure(new SegmentReplicationState(new ReplicationLuceneIndex()), new OpenSearchException("testing"), true); + verify(spyShard).failShard(any(), any()); + closeShard(indexShard, false); + } + + public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -128,7 +193,6 @@ public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOExc final SegmentReplicationTarget spy = Mockito.spy(target); sut.startReplication(spy); sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY); - Mockito.verify(spy, times(1)).cancel(any()); - closeShards(indexShard); + verify(spy, times(1)).cancel(any()); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index b54237a130431..4802612d712f5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -22,7 +22,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; @@ -75,7 +75,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testPublishCheckpointActionOnPrimary() throws InterruptedException { + public void testPublishCheckpointActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -89,7 +89,7 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -98,7 +98,8 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -118,7 +119,6 @@ public void testPublishCheckpointActionOnReplica() { final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); when(indexService.getShard(id)).thenReturn(indexShard); @@ -126,7 +126,7 @@ public void testPublishCheckpointActionOnReplica() { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -135,7 +135,8 @@ public void testPublishCheckpointActionOnReplica() { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -147,7 +148,7 @@ public void testPublishCheckpointActionOnReplica() { final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // onNewCheckpoint should be called on shard with checkpoint request - verify(indexShard).onNewCheckpoint(request); + verify(mockTargetService, times(1)).onNewCheckpoint(checkpoint, indexShard); // the result should indicate success final AtomicBoolean success = new AtomicBoolean();