diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index ff50c361c0823..fda96f3735ac2 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -109,11 +109,6 @@ public RecoveryTarget retryCopy() { return new RecoveryTarget(indexShard, sourceNode, listener); } - public IndexShard indexShard() { - ensureRefCount(); - return indexShard; - } - public String source() { return sourceNode.toString(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index bf9e4607b695a..6f04c6cf6f665 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -80,7 +80,7 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro } else { // From the checkpoint's shard ID, fetch the IndexShard ShardId shardId = checkpoint.getShardId(); - final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.id()); // build the CopyState object and cache it before returning final CopyState copyState = new CopyState(checkpoint, indexShard); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 357a6db869720..2314392defd52 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -18,6 +18,7 @@ import org.apache.lucene.store.ByteBuffersIndexInput; import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; @@ -213,9 +214,10 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener.completeWith(listener, () -> { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); + Store store = null; try { multiFileWriter.renameAllTempFiles(); - final Store store = store(); + store = store(); store.incRef(); // Deserialize the new SegmentInfos object sent from the primary. final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); @@ -250,6 +252,13 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ); fail(rfe, true); throw rfe; + } catch (OpenSearchException ex) { + /* + Ignore closed replication target as it can happen due to index shard closed event in a separate thread. + In such scenario, ignore the exception + */ + assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled"; + logger.info("Replication target closed", ex); } catch (Exception ex) { ReplicationFailedException rfe = new ReplicationFailedException( indexShard.shardId(), @@ -259,7 +268,9 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, fail(rfe, true); throw rfe; } finally { - store.decRef(); + if (store != null) { + store.decRef(); + } } return null; }); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index d240ba044bbc2..e5b530a5cbc08 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -396,7 +396,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha assert indicesService != null; final IndexShard indexShard = indicesService.getShardOrNull(request.getShardId()); // Proceed with round of segment replication only when it is allowed - if (indexShard.getReplicationEngine().isEmpty()) { + if (indexShard == null || indexShard.getReplicationEngine().isEmpty()) { logger.info("Ignore force segment replication sync as it is not allowed"); channel.sendResponse(TransportResponse.Empty.INSTANCE); return; diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 44da7f0f065ae..815ae29114bae 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.common.CheckedFunction; @@ -186,7 +187,7 @@ public void fail(ReplicationFailedException e, boolean sendShardFailure) { protected void ensureRefCount() { if (refCount() <= 0) { - throw new ReplicationFailedException( + throw new OpenSearchException( "ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls" ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 8cb7a5f6d8929..78767ee1dcf8c 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -76,7 +76,7 @@ public void setUp() throws Exception { // This mirrors the creation of the ReplicationCheckpoint inside CopyState testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L); IndexService mockIndexService = mock(IndexService.class); - when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); + when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(primary); TransportService transportService = mock(TransportService.class); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 66df13b89b4c3..0d05b1ec8679e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -51,7 +51,7 @@ public void setUp() throws Exception { ShardId testShardId = mockIndexShard.shardId(); IndicesService mockIndicesService = mock(IndicesService.class); IndexService mockIndexService = mock(IndexService.class); - when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); + when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); // This mirrors the creation of the ReplicationCheckpoint inside CopyState