diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index a1b40cfc66061..a8ecbba2cca56 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,28 +8,41 @@ package org.opensearch.remotestore; +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreIT extends OpenSearchIntegTestCase { private static final String REPOSITORY_NAME = "test-remore-store-repo"; private static final String INDEX_NAME = "remote-store-test-idx-1"; - protected static final int SHARD_COUNT = 1; - protected static final int REPLICA_COUNT = 1; + private static final String TOTAL_OPERATIONS = "total-operations"; + private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations"; + private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; + private static final String MAX_SEQ_NO_REFRESHED_OR_FLUSHED = "max-seq-no-refreshed-or-flushed"; @Override protected Collection> nodePlugins() { @@ -38,10 +51,15 @@ protected Collection> nodePlugins() { @Override public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + private Settings remoteStoreIndexSettings(int numberOfReplicas) { return Settings.builder() .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) @@ -49,6 +67,14 @@ public Settings indexSettings() { .build(); } + private Settings remoteTranslogIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + @Override protected boolean addMockInternalEngine() { return false; @@ -63,18 +89,95 @@ protected Settings featureFlagSettings() { .build(); } - // This is a dummy test to check if create index flow is working as expected. - // As index creation is pre-requisite for each integration test in this class, once we add more integ tests, - // we can remove this test. - public void testIndexCreation() { - internalCluster().startNode(); - + @Before + public void setup() { Path absolutePath = randomRepoPath().toAbsolutePath(); assertAcked( clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + private Map indexData() { + long totalOperations = 0; + long refreshedOrFlushedOperations = 0; + long maxSeqNo = 0; + long maxSeqNoRefreshedOrFlushed = 0; + for (int i = 0; i < randomIntBetween(1, 10); i++) { + if (randomBoolean()) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + maxSeqNoRefreshedOrFlushed = maxSeqNo; + refreshedOrFlushedOperations = totalOperations; + int numberOfOperations = randomIntBetween(20, 50); + for (int j = 0; j < numberOfOperations; j++) { + IndexResponse response = indexSingleDoc(); + maxSeqNo = response.getSeqNo(); + } + totalOperations += numberOfOperations; + } + Map indexingStats = new HashMap<>(); + indexingStats.put(TOTAL_OPERATIONS, totalOperations); + indexingStats.put(REFRESHED_OR_FLUSHED_OPERATIONS, refreshedOrFlushedOperations); + indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo); + indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED, maxSeqNoRefreshedOrFlushed); + return indexingStats; + } - createIndex(INDEX_NAME); + private void verifyRestoredData(Map indexStats, boolean checkTotal) { + String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS; + String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED; ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity)); + IndexResponse response = indexSingleDoc(); + assertEquals(indexStats.get(maxSeqNoGranularity) + 1, response.getSeqNo()); + refresh(INDEX_NAME); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1); + } + + public void testRemoteStoreRestoreFromRemoteSegmentStore() throws IOException { + internalCluster().startNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + Map indexStats = indexData(); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + + verifyRestoredData(indexStats, false); + } + + public void testRemoteTranslogRestore() throws IOException { + internalCluster().startNodes(3); + createIndex(INDEX_NAME, remoteTranslogIndexSettings(0)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + Map indexStats = indexData(); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + + verifyRestoredData(indexStats, true); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java index 4f74304471991..b14f0f8eb3c35 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -202,7 +202,8 @@ private IndexMetadata.Builder updateInSyncAllocations( if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) { allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID; } else { - assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource; + assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource + || recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource; allocationId = updates.initializedPrimary.allocationId().getId(); } // forcing a stale primary resets the in-sync allocations to the singleton set with the stale id diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 3af663d7b41f9..efd2686b41a20 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -179,20 +179,11 @@ private boolean isRefreshAfterCommit() throws IOException { } String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException { - // We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using - // getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up - // in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on - // LOCAL_CHECKPOINT_KEY. - // lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint() - // will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers - // will not be replayed. - assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: " - + indexShard.getEngine().getClass(); - final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfosSnapshot); Map userData = segmentInfosSnapshot.getUserData(); - userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint)); - userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint)); + userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNoFromSegmentInfos)); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNoFromSegmentInfos)); segmentInfosSnapshot.setUserData(userData, false); long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 86123012fee5d..11280ddfeadef 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2443,4 +2443,10 @@ public void manageReplicaSettingForDefaultReplica(boolean apply) { updateSettingsRequest.persistentSettings(settings); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } + + protected String primaryNodeName(String indexName) { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).primaryShard().currentNodeId(); + return clusterState.getRoutingNodes().node(nodeId).node().getName(); + } }