Skip to content

Commit

Permalink
Add step listeners to resolve forcing round of segment replication.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Dec 12, 2022
1 parent 527abb6 commit 18f40a1
Showing 1 changed file with 68 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -782,74 +783,80 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea
public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState recoveryState = (RecoveryState) state;
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
StepListener<Void> forceSegRepListener = new StepListener<>();
// For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before
// it is marked as Started.
if (indexService.getIndexSettings().isSegRepEnabled()) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
// For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before
// it
// is marked as Started.
if (indexShard != null
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + recoveryState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
);
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
}
handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}
);
} else {
shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + recoveryState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
);
}
forceSegmentReplication(indexService, shardRouting, forceSegRepListener);
} else {
shardStateAction.shardStarted(
forceSegRepListener.onResponse(null);
}
forceSegRepListener.whenComplete(
v -> shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + recoveryState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
),
e -> handleRecoveryFailure(shardRouting, true, e)
);
}

/**
* Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary.
*/
private void forceSegmentReplication(
AllocatedIndex<? extends Shard> indexService,
ShardRouting shardRouting,
StepListener<Void> forceSegRepListener
) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
if (indexShard != null
&& indexShard.indexSettings().isSegRepEnabled()
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
forceSegRepListener.onResponse(null);
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
}
forceSegRepListener.onFailure(e);
}
}
);
} else {
forceSegRepListener.onResponse(null);
}
}

Expand Down

0 comments on commit 18f40a1

Please sign in to comment.