Skip to content

Commit

Permalink
Incorporates PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 15, 2022
1 parent e04ce91 commit 6919ada
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 21 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))

### Deprecated

Expand Down Expand Up @@ -89,4 +90,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)


[Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
35 changes: 18 additions & 17 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
Expand Down Expand Up @@ -1704,12 +1705,7 @@ public void prepareForIndexRecovery() {
* This is the first operation after the local checkpoint of the safe commit if exists.
*/
private long recoverLocallyUpToGlobalCheckpoint() {
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
validateLocalRecoveryState();
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
try {
Expand Down Expand Up @@ -1806,21 +1802,17 @@ public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) {
* @return the starting sequence number from which the recovery should start.
*/
private long recoverLocallyUptoLastCommit() {
assert isRemoteTranslogEnabled() : "Remote translog store is not enabled";
long seqNo;
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
validateLocalRecoveryState();

try {
seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO));
seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO));
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.trace("skip local recovery as no index commit found");
logger.error("skip local recovery as no index commit found", e);
return UNASSIGNED_SEQ_NO;
} catch (Exception e) {
logger.debug("skip local recovery as failed to find the safe commit", e);
logger.error("skip local recovery as failed to find the safe commit", e);
return UNASSIGNED_SEQ_NO;
}

Expand All @@ -1829,12 +1821,21 @@ private long recoverLocallyUptoLastCommit() {
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
recoveryState.getTranslog().totalLocal(0);
} catch (Exception e) {
logger.debug("check index failed during fetch seqNo", e);
logger.error("check index failed during fetch seqNo", e);
return UNASSIGNED_SEQ_NO;
}
return seqNo;
}

private void validateLocalRecoveryState() {
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
}

public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
}
Expand Down Expand Up @@ -2041,7 +2042,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid ["
+ userData.get(Engine.HISTORY_UUID_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ protected void reestablishRecovery(final StartRecoveryRequest request, final Str
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request));
}

/**
* Initiates recovery of the replica. TODO - Need to revisit it with PRRL and later. @see
* <a href="https://github.com/opensearch-project/OpenSearch/issues/4502">github issue</a> on it.
* @param recoveryId recovery id
* @param preExistingRequest start recovery request
*/
private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {
final String actionName;
final TransportRequest requestToSend;
Expand All @@ -239,15 +245,15 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false);
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
startRequest = getStartRecoveryRequest(
logger,
clusterService.localNode(),
recoveryTarget,
startingSeqNo,
remoteTranslogEnabled == false
!remoteTranslogEnabled
);
requestToSend = startRequest;
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ protected final void recoverUnstartedReplica(
final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
IndexShard indexShard = recoveryTarget.indexShard();
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false);
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled);
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
logger,
rNode,
Expand Down

0 comments on commit 6919ada

Please sign in to comment.