Skip to content

Commit

Permalink
Incorporates PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 15, 2022
1 parent 5c7915d commit e04ce91
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1703,7 +1703,7 @@ public void prepareForIndexRecovery() {
* @return a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
*/
public long recoverLocallyUpToGlobalCheckpoint() {
private long recoverLocallyUpToGlobalCheckpoint() {
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
Expand Down Expand Up @@ -1805,7 +1805,7 @@ public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) {
*
* @return the starting sequence number from which the recovery should start.
*/
public long recoverLocallyUptoLastCommit() {
private long recoverLocallyUptoLastCommit() {
long seqNo;
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
IndexShard shard = newShard(false);
shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, localNode));
shard.prepareForIndexRecovery();
assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(shard.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO));
assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
Expand Down Expand Up @@ -239,7 +239,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
);
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
replica.prepareForIndexRecovery();
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1));
assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(globalCheckpoint + 1));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(expectedTotalLocal));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectedTotalLocal));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
Expand All @@ -254,7 +254,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE));
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
replica.prepareForIndexRecovery();
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
Expand All @@ -276,10 +276,10 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
replica.prepareForIndexRecovery();
if (safeCommit.isPresent()) {
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1));
assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(safeCommit.get().localCheckpoint + 1));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0));
} else {
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
}
assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG));
Expand Down Expand Up @@ -322,7 +322,7 @@ public void testClosedIndexSkipsLocalRecovery() throws Exception {
);
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
replica.prepareForIndexRecovery();
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1));
assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(safeCommit.get().localCheckpoint + 1));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
Expand All @@ -349,7 +349,7 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception {
shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE));
shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode));
shard.prepareForIndexRecovery();
long startingSeqNo = shard.recoverLocallyUpToGlobalCheckpoint();
long startingSeqNo = shard.recoverLocallyAndFetchStartSeqNo(true);
shard.store().markStoreCorrupted(new IOException("simulated"));
RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null);
StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(logger, rNode, recoveryTarget, startingSeqNo);
Expand Down

0 comments on commit e04ce91

Please sign in to comment.