Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Get SeqNo from Segment Info and integ tests for restore flow #6067

Merged
merged 2 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,41 @@

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreIT extends OpenSearchIntegTestCase {

private static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String INDEX_NAME = "remote-store-test-idx-1";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";
private static final String MAX_SEQ_NO_REFRESHED_OR_FLUSHED = "max-seq-no-refreshed-or-flushed";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand All @@ -38,17 +51,30 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put("index.refresh_interval", "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

private Settings remoteTranslogIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
Expand All @@ -63,18 +89,95 @@ protected Settings featureFlagSettings() {
.build();
}

// This is a dummy test to check if create index flow is working as expected.
// As index creation is pre-requisite for each integration test in this class, once we add more integ tests,
// we can remove this test.
public void testIndexCreation() {
internalCluster().startNode();

@Before
public void setup() {
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

private Map<String, Long> indexData() {
long totalOperations = 0;
long refreshedOrFlushedOperations = 0;
long maxSeqNo = 0;
long maxSeqNoRefreshedOrFlushed = 0;
for (int i = 0; i < randomIntBetween(1, 10); i++) {
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
maxSeqNoRefreshedOrFlushed = maxSeqNo;
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc();
maxSeqNo = response.getSeqNo();
}
totalOperations += numberOfOperations;
}
Map<String, Long> indexingStats = new HashMap<>();
indexingStats.put(TOTAL_OPERATIONS, totalOperations);
indexingStats.put(REFRESHED_OR_FLUSHED_OPERATIONS, refreshedOrFlushedOperations);
indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo);
indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED, maxSeqNoRefreshedOrFlushed);
return indexingStats;
}

createIndex(INDEX_NAME);
private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal) {
String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS;
String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED;
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity));
IndexResponse response = indexSingleDoc();
assertEquals(indexStats.get(maxSeqNoGranularity) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1);
}

public void testRemoteStoreRestoreFromRemoteSegmentStore() throws IOException {
internalCluster().startNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Map<String, Long> indexStats = indexData();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we sure that remote segment store is up to date here ? Should we check the local fs , in case we don't have any stats.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We trigger explicit refresh/flush which calls RemoteStoreRefreshListener in the same flow. So, if remote store is available, the segments should be uploaded. Test failure means some problem with the code and we need to fix it.


internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

verifyRestoredData(indexStats, false);
}

public void testRemoteTranslogRestore() throws IOException {
internalCluster().startNodes(3);
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Map<String, Long> indexStats = indexData();

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

verifyRestoredData(indexStats, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ private IndexMetadata.Builder updateInSyncAllocations(
if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) {
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
} else {
assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource;
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource;
allocationId = updates.initializedPrimary.allocationId().getId();
}
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,11 @@ private boolean isRefreshAfterCommit() throws IOException {
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
// We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using
// getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up
// in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on
// LOCAL_CHECKPOINT_KEY.
// lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint()
// will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers
// will not be replayed.
assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: "
+ indexShard.getEngine().getClass();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfosSnapshot);

Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint));
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNoFromSegmentInfos));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNoFromSegmentInfos));
segmentInfosSnapshot.setUserData(userData, false);

long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2443,4 +2443,10 @@ public void manageReplicaSettingForDefaultReplica(boolean apply) {
updateSettingsRequest.persistentSettings(settings);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

protected String primaryNodeName(String indexName) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).primaryShard().currentNodeId();
return clusterState.getRoutingNodes().node(nodeId).node().getName();
}
}