From 3ec72bf7bfb6dfc0b202474fcaecb74c4ddaf1db Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Wed, 22 Jun 2022 23:29:38 +0000 Subject: [PATCH] Adding onNewCheckpoint to Start Replication on Replica Shard when Segment Replication is turned on (#3540) * Adding onNewCheckpoint and it's test to start replication. SCheck for latestcheckpoint and replaying logic is removed from this commit and will be added in a different PR Signed-off-by: Rishikesh1159 * Changing binding/inject logic and addressing comments from PR Signed-off-by: Rishikesh1159 * Applying spotless check Signed-off-by: Rishikesh1159 * Moving shouldProcessCheckpoint() to IndexShard, and removing some trace logs Signed-off-by: Rishikesh1159 * applying spotlessApply Signed-off-by: Rishikesh1159 * Adding more info to log statement in targetservice class Signed-off-by: Rishikesh1159 * applying spotlessApply Signed-off-by: Rishikesh1159 * Addressing comments on PR Signed-off-by: Rishikesh1159 * Adding teardown() in SegmentReplicationTargetServiceTests. Signed-off-by: Rishikesh1159 * fixing testShouldProcessCheckpoint() in SegmentReplicationTargetServiceTests Signed-off-by: Rishikesh1159 * Removing CheckpointPublisherProvider in IndicesModule Signed-off-by: Rishikesh1159 * spotless check apply Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/engine/Engine.java | 1 + .../index/engine/InternalEngine.java | 2 +- .../opensearch/index/shard/IndexShard.java | 59 +++++++++++-- .../org/opensearch/indices/IndicesModule.java | 2 + .../SegmentReplicationTargetService.java | 35 +++++++- .../checkpoint/PublishCheckpointAction.java | 9 +- .../checkpoint/ReplicationCheckpoint.java | 2 +- ...SegmentReplicationCheckpointPublisher.java | 1 + .../common/ReplicationCollection.java | 10 +++ .../main/java/org/opensearch/node/Node.java | 11 +++ .../SegmentReplicationTargetServiceTests.java | 82 ++++++++++++++++--- .../PublishCheckpointActionTests.java | 17 ++-- 12 files changed, 199 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 5e9bc3926d7c2..66fc680beb62c 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -173,6 +173,7 @@ public final EngineConfig config() { * Return the latest active SegmentInfos from the engine. * @return {@link SegmentInfos} */ + @Nullable protected abstract SegmentInfos getLatestSegmentInfos(); /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index b63a39ebb1222..d2d688a90353e 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2289,7 +2289,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = internalReaderManager.acquire(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d083d947dfbc3..d25847dde235c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -161,7 +161,7 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; @@ -1396,15 +1396,60 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti * Returns the lastest Replication Checkpoint that shard received */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { - return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return Optional.ofNullable(snapshot.get()) + .map( + segmentInfos -> new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + segmentInfos.getVersion() + ) + ) + .orElse( + new ReplicationCheckpoint( + shardId, + getOperationPrimaryTerm(), + SequenceNumbers.NO_OPS_PERFORMED, + getProcessedLocalCheckpoint(), + SequenceNumbers.NO_OPS_PERFORMED + ) + ); + } catch (IOException ex) { + throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); + } } /** - * Invoked when a new checkpoint is received from a primary shard. Starts the copy process. - */ - public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { - assert shardRouting.primary() == false; - // TODO + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed + */ + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (state().equals(IndexShardState.STARTED) == false) { + logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); + return false; + } + ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", + localCheckpoint, + requestCheckpoint + ) + ); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.trace( + () -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint) + ); + return false; + } + return true; } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 0cb2ff958c787..29ff507ad9fcf 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -282,6 +282,8 @@ protected void configure() { bind(RetentionLeaseSyncer.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); + } else { + bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1c6053a72a4c5..c44b27911bb7a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -38,7 +38,7 @@ * * @opensearch.internal */ -public final class SegmentReplicationTargetService implements IndexEventListener { +public class SegmentReplicationTargetService implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); @@ -84,6 +84,39 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + /** + * Invoked when a new checkpoint is received from a primary shard. + * It checks if a new checkpoint should be processed or not and starts replication if needed. + * @param receivedCheckpoint received checkpoint that is checked for processing + * @param replicaShard replica shard on which checkpoint is received + */ + public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", + replicaShard.getLatestReplicationCheckpoint() + ) + ); + return; + } + if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { + startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) {} + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + if (sendShardFailure == true) { + logger.error("replication failure", e); + replicaShard.failShard("replication failure", e); + } + } + }); + + } + } + public void startReplication( final ReplicationCheckpoint checkpoint, final IndexShard indexShard, diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index b74a69971ebd5..8093b6aee88f9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -28,6 +28,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -52,6 +53,8 @@ public class PublishCheckpointAction extends TransportReplicationAction< public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); + private final SegmentReplicationTargetService replicationService; + @Inject public PublishCheckpointAction( Settings settings, @@ -60,7 +63,8 @@ public PublishCheckpointAction( IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters + ActionFilters actionFilters, + SegmentReplicationTargetService targetService ) { super( settings, @@ -75,6 +79,7 @@ public PublishCheckpointAction( PublishCheckpointRequest::new, ThreadPool.Names.REFRESH ); + this.replicationService = targetService; } @Override @@ -165,7 +170,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh ActionListener.completeWith(listener, () -> { logger.trace("Checkpoint received on replica {}", request); if (request.getCheckpoint().getShardId().equals(replica.shardId())) { - replica.onNewCheckpoint(request); + replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } return new ReplicaResult(); }); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 98ab9cc4c1708..f84a65206190b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -115,7 +115,7 @@ public int hashCode() { * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || segmentInfosVersion > other.getSegmentInfosVersion(); + return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index 2b09901a947fe..6be524cea140e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -22,6 +22,7 @@ public class SegmentReplicationCheckpointPublisher { private final PublishAction publishAction; + // This Component is behind feature flag so we are manually binding this in IndicesModule. @Inject public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { this(publishAction::publish); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index b8295f0685a7f..d648ca6041ff8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) { return cancelled; } + /** + * check if a shard is currently replicating + * + * @param shardId shardId for which to check if replicating + * @return true if shard is currently replicating + */ + public boolean isShardReplicating(ShardId shardId) { + return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId)); + } + /** * a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference * causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 2938d6321c7fb..80fa724433f0b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -39,6 +39,8 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexingPressureService; import org.opensearch.extensions.ExtensionsOrchestrator; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; @@ -946,6 +948,15 @@ protected Node( b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { + b.bind(SegmentReplicationTargetService.class) + .toInstance( + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ) + ); b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 06b16b797efe3..33734fe85def5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.junit.Assert; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -18,15 +19,13 @@ import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.transport.TransportService; import java.io.IOException; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -42,7 +41,7 @@ public void setUp() throws Exception { final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); final TransportService transportService = mock(TransportService.class); - indexShard = newShard(false, settings); + indexShard = newStartedShard(false, settings); checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); @@ -57,7 +56,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testTargetReturnsSuccess_listenerCompletes() throws IOException { + public void testTargetReturnsSuccess_listenerCompletes() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -83,10 +82,9 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept return null; }).when(spy).startReplication(any()); sut.startReplication(spy); - closeShards(indexShard); } - public void testTargetThrowsException() throws IOException { + public void testTargetThrowsException() { final OpenSearchException expectedError = new OpenSearchException("Fail"); final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, @@ -115,10 +113,71 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept return null; }).when(spy).startReplication(any()); sut.startReplication(spy); - closeShards(indexShard); } - public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { + public void testAlreadyOnNewCheckpoint() { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + } + + public void testShardAlreadyReplicating() { + SegmentReplicationTargetService spy = spy(sut); + // Create a separate target and start it so the shard is already replicating. + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ); + final SegmentReplicationTarget spyTarget = Mockito.spy(target); + spy.startReplication(spyTarget); + + // a new checkpoint comes in for the same IndexShard. + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + spyTarget.markAsDone(); + } + + public void testNewCheckpointBehindCurrentCheckpoint() { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + } + + public void testShardNotStarted() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + IndexShard shard = newShard(false); + spy.onNewCheckpoint(checkpoint, shard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(shard); + } + + public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOException { + allowShardFailures(); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(indexShard); + ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint(); + ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 1 + ); + ArgumentCaptor captor = ArgumentCaptor.forClass( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(newCheckpoint, spyShard); + verify(spy, times(1)).startReplication(any(), any(), captor.capture()); + SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); + listener.onFailure(new SegmentReplicationState(new ReplicationLuceneIndex()), new OpenSearchException("testing"), true); + verify(spyShard).failShard(any(), any()); + closeShard(indexShard, false); + } + + public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -128,7 +187,6 @@ public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOExc final SegmentReplicationTarget spy = Mockito.spy(target); sut.startReplication(spy); sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY); - Mockito.verify(spy, times(1)).cancel(any()); - closeShards(indexShard); + verify(spy, times(1)).cancel(any()); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 074b5ff613b08..77cc1d744f0dc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -22,7 +22,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; @@ -73,7 +73,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testPublishCheckpointActionOnPrimary() throws InterruptedException { + public void testPublishCheckpointActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -87,7 +87,7 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -96,7 +96,8 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -116,7 +117,6 @@ public void testPublishCheckpointActionOnReplica() { final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); when(indexService.getShard(id)).thenReturn(indexShard); @@ -124,7 +124,7 @@ public void testPublishCheckpointActionOnReplica() { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -133,7 +133,8 @@ public void testPublishCheckpointActionOnReplica() { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -145,7 +146,7 @@ public void testPublishCheckpointActionOnReplica() { final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // onNewCheckpoint should be called on shard with checkpoint request - verify(indexShard).onNewCheckpoint(request); + verify(mockTargetService, times(1)).onNewCheckpoint(checkpoint, indexShard); // the result should indicate success final AtomicBoolean success = new AtomicBoolean();