Skip to content

Commit

Permalink
Fix translog UUID mismatch on existing store recovery.
Browse files Browse the repository at this point in the history
This commit adds PR feedback and recovery tests post node restart.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Jan 9, 2025
1 parent 33b3363 commit cf68380
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,22 @@

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -26,12 +34,16 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.routing.RecoverySource.Type.EMPTY_STORE;
import static org.opensearch.cluster.routing.RecoverySource.Type.EXISTING_STORE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {
public class SearchReplicaReplicationAndRecoveryIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;
Expand All @@ -58,7 +70,7 @@ public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}
Expand Down Expand Up @@ -127,6 +139,46 @@ public void testSegmentReplicationStatsResponseWithSearchReplica() throws Except
assertNotNull(replicaStat.getCurrentReplicationState());
}
}
public void testSearchReplicaRecovery() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();

// ensure search replicas are only allocated to "replica" node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", replica))
.execute()
.actionGet();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertRecoverySourceType(replica, EMPTY_STORE);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
flush(INDEX_NAME);
waitForSearchableDocs(10, primary, replica);

// Node stats should show remote download stats as nonzero, use this as a precondition to compare
// post restart.
assertDownloadStats(replica, true);
NodesStatsResponse nodesStatsResponse;
NodeStats nodeStats;

internalCluster().restartNode(replica);
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

// assert existing store recovery
assertRecoverySourceType(replica, EXISTING_STORE);
assertDownloadStats(replica, false);
}

public void testRecoveryAfterDocsIndexed() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
Expand All @@ -142,6 +194,10 @@ public void testRecoveryAfterDocsIndexed() throws Exception {
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

assertRecoverySourceType(replica, EMPTY_STORE);
// replica should have downloaded from remote
assertDownloadStats(replica, true);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
Expand All @@ -159,22 +215,40 @@ public void testRecoveryAfterDocsIndexed() throws Exception {
assertDocCounts(10, replica);

internalCluster().restartNode(replica);

ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
assertRecoverySourceType(replica, EXISTING_STORE);
assertDownloadStats(replica, false);
}

private static void assertRecoverySourceType(String replica, RecoverySource.Type recoveryType) throws InterruptedException,
ExecutionException {
RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(INDEX_NAME)).get();
for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get(INDEX_NAME)) {
if (recoveryState.getPrimary() == false) {
assertEquals("All SR should be of expected recovery type", recoveryType, recoveryState.getRecoverySource().getType());
assertEquals("All SR should be on the specified node", replica, recoveryState.getTargetNode().getName());
}
}
}

private static void assertDownloadStats(String replica, boolean expectBytesDownloaded) throws InterruptedException, ExecutionException {
NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest(replica)).get();
assertEquals(1, nodesStatsResponse.getNodes().size());
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
assertEquals(replica, nodeStats.getNode().getName());
if (expectBytesDownloaded) {
assertTrue(nodeStats.getIndices().getSegments().getRemoteSegmentStats().getDownloadBytesStarted() > 0);
} else {
assertEquals(0, nodeStats.getIndices().getSegments().getRemoteSegmentStats().getDownloadBytesStarted());
}
}

public void testStopPrimary_RestoreOnNewNode() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build()
);
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final int docCount = 10;
for (int i = 0; i < docCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class IndexShardRoutingTable extends AbstractDiffable<IndexShardRoutingTa
}
if (shard.isSearchOnly()) {
// mark search only shards as initializing or assigned, but do not add to
// the allAllocationId set. Cluster Manager will filter out search replica aIds in
// the allAllocationId set. Cluster Manager will filter out search replica allocationIds in
// the in-sync set that is sent to primaries, but they are still included in the routing table.
// This ensures the primaries do not validate these ids exist in tracking nor are included
// in the unavailableInSyncShards set.
Expand Down
18 changes: 7 additions & 11 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2524,23 +2524,19 @@ public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOEx
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().translogManager().skipTranslogRecovery();
openEngineAndSkipTranslogRecovery(true);
}

public void openEngineAndSkipTranslogRecoveryFromSnapshot() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT : "not a snapshot recovery ["
+ routingEntry()
+ "]";
openEngineAndSkipTranslogRecovery(false);
}

void openEngineAndSkipTranslogRecovery(boolean syncFromRemote) throws IOException {
assert routingEntry().isSearchOnly() || routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT
: "not a snapshot recovery [" + routingEntry() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
openEngineAndSkipTranslogRecovery(routingEntry().isSearchOnly());
}

void openEngineAndSkipTranslogRecovery(boolean syncFromRemote) throws IOException {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker, syncFromRemote);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
// not primary eligible. This will skip any checkpoint tracking and ensure
// that the shards are sync'd with remote store before opening.
//
indexShard.openEngineAndSkipTranslogRecovery(true);
// first bootstrap new history / translog so that the TranslogUUID matches the UUID from the latest commit.
bootstrapForSnapshot(indexShard, store);
indexShard.openEngineAndSkipTranslogRecoveryFromSnapshot();
}
if (indexShard.shouldSeedRemoteStore()) {
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
Expand Down Expand Up @@ -886,6 +888,7 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));

final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
localCheckpoint,
Expand Down

0 comments on commit cf68380

Please sign in to comment.