From 3c9eeb4d02fb431939bf25ef48fb3cab3500af0d Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 20 Mar 2023 13:57:14 -0700 Subject: [PATCH] [Segment Replication] Fix segrep compatibility check for closed indices (#6749) (#6760) * [Segment Replication] Fix segrep compatibility check for closed indices * Return false on primary routing --------- Signed-off-by: Suraj Singh --- .../opensearch/index/shard/IndexShard.java | 25 +++++++++---------- .../SegmentReplicationTargetService.java | 7 +++++- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4dcf7f5ab965a..b95af8621493b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1430,6 +1430,14 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } + public Optional getReplicationEngine() { + if (getEngine() instanceof NRTReplicationEngine) { + return Optional.of((NRTReplicationEngine) getEngine()); + } else { + return Optional.empty(); + } + } + public void finalizeReplication(SegmentInfos infos) throws IOException { if (getReplicationEngine().isPresent()) { getReplicationEngine().get().updateSegments(infos); @@ -1520,16 +1528,15 @@ public boolean isSegmentReplicationAllowed() { logger.warn("Shard is in primary mode and cannot perform segment replication as a replica."); return false; } - if (this.routingEntry().primary() && this.routingEntry().isRelocationTarget() == false) { - logger.warn("Shard is marked as primary but not relocating, so cannot perform segment replication"); + if (this.routingEntry().primary()) { + logger.warn("Shard routing is marked primary thus cannot perform segment replication as replica"); return false; } if (state().equals(IndexShardState.STARTED) == false - && ((state() == IndexShardState.RECOVERING || state() == IndexShardState.POST_RECOVERY) - && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) { + && (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) { logger.warn( () -> new ParameterizedMessage( - "Shard is not started or recovering {} {} and cannot perform segment replication", + "Shard is not started or recovering {} {} and cannot perform segment replication as a replica", state(), shardRouting.state() ) @@ -2974,14 +2981,6 @@ public long getProcessedLocalCheckpoint() { }); } - private Optional getReplicationEngine() { - if (getEngine() instanceof NRTReplicationEngine) { - return Optional.of((NRTReplicationEngine) getEngine()); - } else { - return Optional.empty(); - } - } - /** * Returns the global checkpoint for the shard. * diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 0cdd4907d31ea..c507bccd0071d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -385,13 +385,18 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha } } + /** + * Force sync transport handler forces round of segment replication. Caller should verify necessary checks before + * calling this handler. + */ private class ForceSyncTransportRequestHandler implements TransportRequestHandler { @Override public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception { assert indicesService != null; final IndexShard indexShard = indicesService.getShardOrNull(request.getShardId()); // Proceed with round of segment replication only when it is allowed - if (indexShard.isSegmentReplicationAllowed() == false) { + if (indexShard.getReplicationEngine().isEmpty()) { + logger.info("Ignore force segment replication sync as it is not allowed"); channel.sendResponse(TransportResponse.Empty.INSTANCE); return; }