From 5c7915d8afa814b4809824a08ae89baea52aec49 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 15 Sep 2022 11:58:45 +0530 Subject: [PATCH] Incorporated PR review feedback Signed-off-by: Ashish Singh --- .../org/opensearch/index/shard/IndexShard.java | 14 +++++++++++--- .../recovery/PeerRecoveryTargetService.java | 15 ++++++--------- .../indices/recovery/RecoverySourceHandler.java | 2 +- .../index/shard/IndexShardTestCase.java | 8 ++------ 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 83aab1c7f9227..c2542e1a2907c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -163,8 +163,8 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1792,12 +1792,20 @@ public long recoverLocallyUpToGlobalCheckpoint() { } } + public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) { + if (localTranslog) { + return recoverLocallyUpToGlobalCheckpoint(); + } else { + return recoverLocallyUptoLastCommit(); + } + } + /** * The method figures out the sequence number basis the last commit. * * @return the starting sequence number from which the recovery should start. */ - public long fetchStartSeqNoFromLastCommit() { + public long recoverLocallyUptoLastCommit() { long seqNo; assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; if (state != IndexShardState.RECOVERING) { @@ -3310,7 +3318,7 @@ private boolean isRemoteStoreEnabled() { return (remoteStore != null && shardRouting.primary()); } - public boolean isRemoteTranslogEnabledOnPrimary() { + public boolean isRemoteTranslogEnabled() { return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 51b73c20c5c30..324d246adc88b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -238,11 +238,8 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); - boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false - && indexShard.isRemoteTranslogEnabledOnPrimary(); - final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex - ? indexShard.fetchStartSeqNoFromLastCommit() - : indexShard.recoverLocallyUpToGlobalCheckpoint(); + boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; startRequest = getStartRecoveryRequest( @@ -250,7 +247,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi clusterService.localNode(), recoveryTarget, startingSeqNo, - !isRecoveringReplicaWithRemoteTxLogEnabledIndex + remoteTranslogEnabled == false ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; @@ -297,7 +294,7 @@ public static StartRecoveryRequest getStartRecoveryRequest( * @param recoveryTarget the target of the recovery * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. * This is the first operation after the local checkpoint of the safe commit if exists. - * @param validateTranslog should the recovery request validate translog consistency with snapshot store metadata. + * @param verifyTranslog should the recovery request validate translog consistency with snapshot store metadata. * @return a start recovery request */ public static StartRecoveryRequest getStartRecoveryRequest( @@ -305,7 +302,7 @@ public static StartRecoveryRequest getStartRecoveryRequest( DiscoveryNode localNode, RecoveryTarget recoveryTarget, long startingSeqNo, - boolean validateTranslog + boolean verifyTranslog ) { final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); @@ -313,7 +310,7 @@ public static StartRecoveryRequest getStartRecoveryRequest( Store.MetadataSnapshot metadataSnapshot; try { metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - if (validateTranslog) { + if (verifyTranslog) { // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene // index. try { diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 648fb75d5e89f..665e79722770e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -317,7 +317,7 @@ && isTargetSameHistory() assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false - && shard.isRemoteTranslogEnabledOnPrimary(); + && shard.isRemoteTranslogEnabled(); if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) { sendFileStep.whenComplete(r -> { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 35e0205c3b665..054aa6ad6eacc 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -134,7 +134,6 @@ import java.io.IOException; import java.util.ArrayList; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -854,11 +853,8 @@ protected final void recoverUnstartedReplica( replica.prepareForIndexRecovery(); final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); IndexShard indexShard = recoveryTarget.indexShard(); - boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false - && indexShard.isRemoteTranslogEnabledOnPrimary(); - final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex - ? indexShard.fetchStartSeqNoFromLastCommit() - : indexShard.recoverLocallyUpToGlobalCheckpoint(); + boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false); final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode,