Skip to content

Commit

Permalink
[Remote Store] Change behaviour in replica recovery for remote transl…
Browse files Browse the repository at this point in the history
…og enabled indices (#4318)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Oct 22, 2022
1 parent 0503897 commit 1283482
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 95 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Load the deprecated master role in a dedicated method instead of in setAdditionalRoles() ([#4582](https://github.com/opensearch-project/OpenSearch/pull/4582))
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Further simplification of the ZIP publication implementation ([#4360](https://github.com/opensearch-project/OpenSearch/pull/4360))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))

### Deprecated
### Removed
### Fixed
Expand Down
66 changes: 57 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -202,6 +202,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
Expand Down Expand Up @@ -1699,13 +1700,8 @@ public void prepareForIndexRecovery() {
* @return a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
*/
public long recoverLocallyUpToGlobalCheckpoint() {
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
private long recoverLocallyUpToGlobalCheckpoint() {
validateLocalRecoveryState();
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
try {
Expand Down Expand Up @@ -1787,6 +1783,54 @@ public long recoverLocallyUpToGlobalCheckpoint() {
}
}

public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) {
if (localTranslog) {
return recoverLocallyUpToGlobalCheckpoint();
} else {
return recoverLocallyUptoLastCommit();
}
}

/**
* The method figures out the sequence number basis the last commit.
*
* @return the starting sequence number from which the recovery should start.
*/
private long recoverLocallyUptoLastCommit() {
assert isRemoteTranslogEnabled() : "Remote translog store is not enabled";
long seqNo;
validateLocalRecoveryState();

try {
seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO));
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.error("skip local recovery as no index commit found", e);
return UNASSIGNED_SEQ_NO;
} catch (Exception e) {
logger.error("skip local recovery as failed to find the safe commit", e);
return UNASSIGNED_SEQ_NO;
}

try {
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
recoveryState.getTranslog().totalLocal(0);
} catch (Exception e) {
logger.error("check index failed during fetch seqNo", e);
return UNASSIGNED_SEQ_NO;
}
return seqNo;
}

private void validateLocalRecoveryState() {
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
}

public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
}
Expand Down Expand Up @@ -1997,7 +2041,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid ["
+ userData.get(Engine.HISTORY_UUID_KEY)
Expand Down Expand Up @@ -3297,6 +3341,10 @@ private boolean isRemoteStoreEnabled() {
return (remoteStore != null && shardRouting.primary());
}

public boolean isRemoteTranslogEnabled() {
return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled();
}

/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -219,6 +219,12 @@ protected void reestablishRecovery(final StartRecoveryRequest request, final Str
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request));
}

/**
* Initiates recovery of the replica. TODO - Need to revisit it with PRRL and later. @see
* <a href="https://github.com/opensearch-project/OpenSearch/issues/4502">github issue</a> on it.
* @param recoveryId recovery id
* @param preExistingRequest start recovery request
*/
private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {
final String actionName;
final TransportRequest requestToSend;
Expand All @@ -238,10 +244,17 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
startRequest = getStartRecoveryRequest(
logger,
clusterService.localNode(),
recoveryTarget,
startingSeqNo,
!remoteTranslogEnabled
);
requestToSend = startRequest;
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
} catch (final Exception e) {
Expand Down Expand Up @@ -270,44 +283,58 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
);
}

public static StartRecoveryRequest getStartRecoveryRequest(
Logger logger,
DiscoveryNode localNode,
RecoveryTarget recoveryTarget,
long startingSeqNo
) {
return getStartRecoveryRequest(logger, localNode, recoveryTarget, startingSeqNo, true);
}

/**
* Prepare the start recovery request.
*
* @param logger the logger
* @param localNode the local node of the recovery target
* @param recoveryTarget the target of the recovery
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
* @param logger the logger
* @param localNode the local node of the recovery target
* @param recoveryTarget the target of the recovery
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
* @param verifyTranslog should the recovery request validate translog consistency with snapshot store metadata.
* @return a start recovery request
*/
public static StartRecoveryRequest getStartRecoveryRequest(
Logger logger,
DiscoveryNode localNode,
RecoveryTarget recoveryTarget,
long startingSeqNo
long startingSeqNo,
boolean verifyTranslog
) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

Store.MetadataSnapshot metadataSnapshot;
try {
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
try {
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
logger.warn(
new ParameterizedMessage(
"error while reading global checkpoint from translog, "
+ "resetting the starting sequence number from {} to unassigned and recovering as if there are none",
startingSeqNo
),
e
);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
startingSeqNo = UNASSIGNED_SEQ_NO;
if (verifyTranslog) {
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene
// index.
try {
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
logger.warn(
new ParameterizedMessage(
"error while reading global checkpoint from translog, "
+ "resetting the starting sequence number from {} to unassigned and recovering as if there are none",
startingSeqNo
),
e
);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
startingSeqNo = UNASSIGNED_SEQ_NO;
}
}
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
Expand Down
Loading

0 comments on commit 1283482

Please sign in to comment.