Skip to content

Commit

Permalink
Incorporated PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 15, 2022
1 parent b26cf19 commit 5c7915d
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 19 deletions.
14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -1792,12 +1792,20 @@ public long recoverLocallyUpToGlobalCheckpoint() {
}
}

public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) {
if (localTranslog) {
return recoverLocallyUpToGlobalCheckpoint();
} else {
return recoverLocallyUptoLastCommit();
}
}

/**
* The method figures out the sequence number basis the last commit.
*
* @return the starting sequence number from which the recovery should start.
*/
public long fetchStartSeqNoFromLastCommit() {
public long recoverLocallyUptoLastCommit() {
long seqNo;
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
Expand Down Expand Up @@ -3310,7 +3318,7 @@ private boolean isRemoteStoreEnabled() {
return (remoteStore != null && shardRouting.primary());
}

public boolean isRemoteTranslogEnabledOnPrimary() {
public boolean isRemoteTranslogEnabled() {
return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,19 +238,16 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false
&& indexShard.isRemoteTranslogEnabledOnPrimary();
final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex
? indexShard.fetchStartSeqNoFromLastCommit()
: indexShard.recoverLocallyUpToGlobalCheckpoint();
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
startRequest = getStartRecoveryRequest(
logger,
clusterService.localNode(),
recoveryTarget,
startingSeqNo,
!isRecoveringReplicaWithRemoteTxLogEnabledIndex
remoteTranslogEnabled == false
);
requestToSend = startRequest;
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
Expand Down Expand Up @@ -297,23 +294,23 @@ public static StartRecoveryRequest getStartRecoveryRequest(
* @param recoveryTarget the target of the recovery
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
* @param validateTranslog should the recovery request validate translog consistency with snapshot store metadata.
* @param verifyTranslog should the recovery request validate translog consistency with snapshot store metadata.
* @return a start recovery request
*/
public static StartRecoveryRequest getStartRecoveryRequest(
Logger logger,
DiscoveryNode localNode,
RecoveryTarget recoveryTarget,
long startingSeqNo,
boolean validateTranslog
boolean verifyTranslog
) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

Store.MetadataSnapshot metadataSnapshot;
try {
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
if (validateTranslog) {
if (verifyTranslog) {
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene
// index.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ && isTargetSameHistory()
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;

boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false
&& shard.isRemoteTranslogEnabledOnPrimary();
&& shard.isRemoteTranslogEnabled();

if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) {
sendFileStep.whenComplete(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -854,11 +853,8 @@ protected final void recoverUnstartedReplica(
replica.prepareForIndexRecovery();
final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
IndexShard indexShard = recoveryTarget.indexShard();
boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false
&& indexShard.isRemoteTranslogEnabledOnPrimary();
final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex
? indexShard.fetchStartSeqNoFromLastCommit()
: indexShard.recoverLocallyUpToGlobalCheckpoint();
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false);
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
logger,
rNode,
Expand Down

0 comments on commit 5c7915d

Please sign in to comment.