Skip to content

Commit

Permalink
Remove unnecessary start replication overloaded method.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Nov 22, 2022
1 parent 3e5a7c5 commit d50a9e0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
Expand Down Expand Up @@ -788,41 +789,49 @@ public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
segmentReplicationTargetService.startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + RecState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
);
}
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + RecState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
);
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
}
}
}
});
);
} else {
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ public SegmentReplicationTarget startReplication(
return target;
}

public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) {
return startReplication(ReplicationCheckpoint.empty(indexShard.shardId()), indexShard, listener);
}

// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout());
Expand Down

0 comments on commit d50a9e0

Please sign in to comment.