Skip to content

Commit

Permalink
[Segment Replication] Fix segrep compatibility check for closed indic…
Browse files Browse the repository at this point in the history
…es (opensearch-project#6749) (opensearch-project#6760)

* [Segment Replication] Fix segrep compatibility check for closed indices



* Return false on primary routing



---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 authored Mar 20, 2023
1 parent bbd2785 commit 3c9eeb4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
25 changes: 12 additions & 13 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,14 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
}
}

public Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
} else {
return Optional.empty();
}
}

public void finalizeReplication(SegmentInfos infos) throws IOException {
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos);
Expand Down Expand Up @@ -1520,16 +1528,15 @@ public boolean isSegmentReplicationAllowed() {
logger.warn("Shard is in primary mode and cannot perform segment replication as a replica.");
return false;
}
if (this.routingEntry().primary() && this.routingEntry().isRelocationTarget() == false) {
logger.warn("Shard is marked as primary but not relocating, so cannot perform segment replication");
if (this.routingEntry().primary()) {
logger.warn("Shard routing is marked primary thus cannot perform segment replication as replica");
return false;
}
if (state().equals(IndexShardState.STARTED) == false
&& ((state() == IndexShardState.RECOVERING || state() == IndexShardState.POST_RECOVERY)
&& shardRouting.state() == ShardRoutingState.INITIALIZING) == false) {
&& (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) {
logger.warn(
() -> new ParameterizedMessage(
"Shard is not started or recovering {} {} and cannot perform segment replication",
"Shard is not started or recovering {} {} and cannot perform segment replication as a replica",
state(),
shardRouting.state()
)
Expand Down Expand Up @@ -2974,14 +2981,6 @@ public long getProcessedLocalCheckpoint() {
});
}

private Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
} else {
return Optional.empty();
}
}

/**
* Returns the global checkpoint for the shard.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,18 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
}
}

/**
* Force sync transport handler forces round of segment replication. Caller should verify necessary checks before
* calling this handler.
*/
private class ForceSyncTransportRequestHandler implements TransportRequestHandler<ForceSyncRequest> {
@Override
public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception {
assert indicesService != null;
final IndexShard indexShard = indicesService.getShardOrNull(request.getShardId());
// Proceed with round of segment replication only when it is allowed
if (indexShard.isSegmentReplicationAllowed() == false) {
if (indexShard.getReplicationEngine().isEmpty()) {
logger.info("Ignore force segment replication sync as it is not allowed");
channel.sendResponse(TransportResponse.Empty.INSTANCE);
return;
}
Expand Down

0 comments on commit 3c9eeb4

Please sign in to comment.