From 3c14fd33c9fcd97bb7527b711bd3cf3fe7498a11 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 12 Jul 2023 00:41:20 +0530 Subject: [PATCH] [Backport 2.x] Add Changes in Snapshot Delete Flow for remote store interoperability. (#7497) (#8627) * Add Changes in Snapshot Delete Flow for remote store interoperability. (#7497) Signed-off-by: Bansi Kasundra * Spotless fixes Signed-off-by: Sachin Kale --------- Signed-off-by: Bansi Kasundra Signed-off-by: Sachin Kale Co-authored-by: Bansi Kasundra <66969140+kasundra07@users.noreply.github.com> Co-authored-by: Sachin Kale --- .../snapshots/DeleteSnapshotIT.java | 305 ++++++++++++++++++ .../TransportCleanupRepositoryAction.java | 5 + .../opensearch/repositories/Repository.java | 19 ++ .../blobstore/BlobStoreRepository.java | 299 +++++++++++++---- .../snapshots/SnapshotsService.java | 24 +- 5 files changed, 591 insertions(+), 61 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java new file mode 100644 index 0000000000000..2688449294f3d --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java @@ -0,0 +1,305 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.client.Client; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.test.FeatureFlagSetter; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.is; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase { + + public void testDeleteSnapshot() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "fs", snapshotRepoPath); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String snapshot = "snapshot"; + createFullSnapshot(snapshotRepoName, snapshot); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1); + + assertAcked(startDeleteSnapshot(snapshotRepoName, snapshot).get()); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0); + } + + public void testDeleteShallowCopySnapshot() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + + final String snapshotRepoName = "snapshot-repo-name"; + createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy()); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + final String shallowSnapshot = "shallow-snapshot"; + createFullSnapshot(snapshotRepoName, shallowSnapshot); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 1); + + assertAcked(startDeleteSnapshot(snapshotRepoName, shallowSnapshot).get()); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + } + + // Deleting multiple shallow copy snapshots as part of single delete call with repo having only shallow copy snapshots. + public void testDeleteMultipleShallowCopySnapshotsCase1() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(2); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + // Creating some shallow copy snapshots + int totalShallowCopySnapshotsCount = randomIntBetween(4, 10); + List shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount); + List snapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount)); + int tobeDeletedSnapshotsCount = snapshotsToBeDeleted.size(); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalShallowCopySnapshotsCount); + // Deleting subset of shallow copy snapshots + assertAcked( + clusterManagerClient.admin() + .cluster() + .prepareDeleteSnapshot(snapshotRepoName, snapshotsToBeDeleted.toArray(new String[0])) + .get() + ); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalShallowCopySnapshotsCount - tobeDeletedSnapshotsCount); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount + - tobeDeletedSnapshotsCount); + } + + // Deleting multiple shallow copy snapshots as part of single delete call with both partial and full copy snapshot present in the repo + // And then deleting multiple full copy snapshots as part of single delete call with both partial and shallow copy snapshots present in + // the repo + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8610") + public void testDeleteMultipleShallowCopySnapshotsCase2() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + + internalCluster().startClusterManagerOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + ensureStableCluster(2); + final String clusterManagerNode = internalCluster().getClusterManagerName(); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + // Creating a partial shallow copy snapshot + final String snapshot = "snapshot"; + blockNodeWithIndex(snapshotRepoName, testIndex); + blockDataNode(snapshotRepoName, dataNode); + + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + final ActionFuture snapshotFuture = clusterManagerClient.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshot) + .setWaitForCompletion(true) + .execute(); + + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(dataNode, snapshotRepoName, TimeValue.timeValueSeconds(30L)); + internalCluster().restartNode(dataNode); + assertThat(snapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL)); + + unblockAllDataNodes(snapshotRepoName); + + ensureStableCluster(2, clusterManagerNode); + + // Creating some shallow copy snapshots + int totalShallowCopySnapshotsCount = randomIntBetween(4, 10); + List shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount); + List shallowCopySnapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount)); + int tobeDeletedShallowCopySnapshotsCount = shallowCopySnapshotsToBeDeleted.size(); + totalShallowCopySnapshotsCount += 1; // Adding partial shallow snapshot here + // Updating the snapshot repository flag to disable shallow snapshots + createRepository(snapshotRepoName, "mock", snapshotRepoPath); + // Creating some full copy snapshots + int totalFullCopySnapshotsCount = randomIntBetween(4, 10); + List fullCopySnapshots = createNSnapshots(snapshotRepoName, totalFullCopySnapshotsCount); + List fullCopySnapshotsToBeDeleted = fullCopySnapshots.subList(0, randomIntBetween(2, totalFullCopySnapshotsCount)); + int tobeDeletedFullCopySnapshotsCount = fullCopySnapshotsToBeDeleted.size(); + + int totalSnapshotsCount = totalFullCopySnapshotsCount + totalShallowCopySnapshotsCount; + + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount); + // Deleting subset of shallow copy snapshots + assertAcked( + clusterManagerClient.admin() + .cluster() + .prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshotsToBeDeleted.toArray(new String[0])) + .get() + ); + totalSnapshotsCount -= tobeDeletedShallowCopySnapshotsCount; + totalShallowCopySnapshotsCount -= tobeDeletedShallowCopySnapshotsCount; + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount); + + // Deleting subset of full copy snapshots + assertAcked( + clusterManagerClient.admin() + .cluster() + .prepareDeleteSnapshot(snapshotRepoName, fullCopySnapshotsToBeDeleted.toArray(new String[0])) + .get() + ); + totalSnapshotsCount -= tobeDeletedFullCopySnapshotsCount; + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount); + } + + // Deleting subset of shallow and full copy snapshots as part of single delete call and then deleting all snapshots in the repo. + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8610") + public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(2); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final Path remoteStoreRepoPath = randomRepoPath(); + final String remoteStoreRepoName = "remote-store-repo-name"; + createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + // Creating some shallow copy snapshots + int totalShallowCopySnapshotsCount = randomIntBetween(4, 10); + List shallowCopySnapshots = createNSnapshots(snapshotRepoName, totalShallowCopySnapshotsCount); + List shallowCopySnapshotsToBeDeleted = shallowCopySnapshots.subList(0, randomIntBetween(2, totalShallowCopySnapshotsCount)); + int tobeDeletedShallowCopySnapshotsCount = shallowCopySnapshotsToBeDeleted.size(); + // Updating the snapshot repository flag to disable shallow snapshots + createRepository(snapshotRepoName, "mock", snapshotRepoPath); + // Creating some full copy snapshots + int totalFullCopySnapshotsCount = randomIntBetween(4, 10); + List fullCopySnapshots = createNSnapshots(snapshotRepoName, totalFullCopySnapshotsCount); + List fullCopySnapshotsToBeDeleted = fullCopySnapshots.subList(0, randomIntBetween(2, totalFullCopySnapshotsCount)); + int tobeDeletedFullCopySnapshotsCount = fullCopySnapshotsToBeDeleted.size(); + + int totalSnapshotsCount = totalFullCopySnapshotsCount + totalShallowCopySnapshotsCount; + + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount); + // Deleting subset of shallow copy snapshots and full copy snapshots + assertAcked( + clusterManagerClient.admin() + .cluster() + .prepareDeleteSnapshot( + snapshotRepoName, + Stream.concat(shallowCopySnapshotsToBeDeleted.stream(), fullCopySnapshotsToBeDeleted.stream()).toArray(String[]::new) + ) + .get() + ); + totalSnapshotsCount -= (tobeDeletedShallowCopySnapshotsCount + tobeDeletedFullCopySnapshotsCount); + totalShallowCopySnapshotsCount -= tobeDeletedShallowCopySnapshotsCount; + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == totalSnapshotsCount); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == totalShallowCopySnapshotsCount); + + // Deleting all the remaining snapshots + assertAcked(clusterManagerClient.admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "*").get()); + assert (getRepositoryData(snapshotRepoName).getSnapshotIds().size() == 0); + assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 0); + } + + private List createNSnapshots(String repoName, int count) { + final List snapshotNames = new ArrayList<>(count); + final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-"; + for (int i = 0; i < count; i++) { + final String name = prefix + i; + createFullSnapshot(repoName, name); + snapshotNames.add(name); + } + logger.info("--> created {} in [{}]", snapshotNames, repoName); + return snapshotNames; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index a3804db687a2d..6a7839aa3c562 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -54,6 +54,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryCleanupResult; @@ -97,6 +98,8 @@ public final class TransportCleanupRepositoryAction extends TransportClusterMana private final SnapshotsService snapshotsService; + private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory; + @Override protected String executor() { return ThreadPool.Names.SAME; @@ -123,6 +126,7 @@ public TransportCleanupRepositoryAction( ); this.repositoriesService = repositoriesService; this.snapshotsService = snapshotsService; + this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); // We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover. // This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent // operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes. @@ -281,6 +285,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS l -> blobStoreRepository.cleanup( repositoryStateId, snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null), + remoteStoreLockManagerFactory, ActionListener.wrap(result -> after(null, result), e -> after(e, null)) ) ) diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 81e842217ad9a..76bad890ddebb 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -183,6 +183,25 @@ void deleteSnapshots( ActionListener listener ); + /** + * Deletes snapshots and releases respective lock files from remote store repository. + * + * @param snapshotIds snapshot ids + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param remoteStoreLockManagerFactory RemoteStoreLockManagerFactory to be used for cleaning up remote store lock files + * @param listener completion listener + */ + default void deleteSnapshotsAndReleaseLockFiles( + Collection snapshotIds, + long repositoryStateId, + Version repositoryMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { + throw new UnsupportedOperationException(); + } + /** * Returns snapshot throttle time in nanoseconds */ diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 93e7d50461cb9..225e4a1c8e1c9 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -822,11 +822,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } } - @Override - public void deleteSnapshots( + public void deleteSnapshotsAndReleaseLockFiles( Collection snapshotIds, long repositoryStateId, Version repositoryMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { if (isReadOnly()) { @@ -847,6 +847,7 @@ protected void doRun() throws Exception { rootBlobs, repositoryData, repositoryMetaVersion, + remoteStoreLockManagerFactory, listener ); } @@ -859,6 +860,22 @@ public void onFailure(Exception e) { } } + @Override + public void deleteSnapshots( + Collection snapshotIds, + long repositoryStateId, + Version repositoryMetaVersion, + ActionListener listener + ) { + deleteSnapshotsAndReleaseLockFiles( + snapshotIds, + repositoryStateId, + repositoryMetaVersion, + null, // Passing null since no remote store lock files need to be cleaned up. + listener + ); + } + /** * Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation. * @@ -908,16 +925,18 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map snapshotIds, @@ -926,13 +945,19 @@ private void doDeleteShardSnapshots( Map rootBlobs, RepositoryData repositoryData, Version repoMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - if (SnapshotsService.useShardGenerations(repoMetaVersion)) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); - writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); + writeUpdatedShardMetaDataAndComputeDeletes( + snapshotIds, + repositoryData, + true, + remoteStoreLockManagerFactory, + writeShardMetaDataAndComputeDeletesStep + ); // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: // 1. Remove the snapshots from the list of existing snapshots // 2. Update the index shard generations of all updated shard folders @@ -962,11 +987,19 @@ private void doDeleteShardSnapshots( ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), 2 ); - cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); + cleanupUnlinkedRootAndIndicesBlobs( + snapshotIds, + foundIndices, + rootBlobs, + updatedRepoData, + remoteStoreLockManagerFactory, + afterCleanupsListener + ); asyncCleanupUnlinkedShardLevelBlobs( repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), + remoteStoreLockManagerFactory, afterCleanupsListener ); }, listener::onFailure); @@ -979,11 +1012,30 @@ private void doDeleteShardSnapshots( ActionListener.wrap(() -> listener.onResponse(newRepoData)), 2 ); - cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener); + cleanupUnlinkedRootAndIndicesBlobs( + snapshotIds, + foundIndices, + rootBlobs, + newRepoData, + remoteStoreLockManagerFactory, + afterCleanupsListener + ); final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); - writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep); + writeUpdatedShardMetaDataAndComputeDeletes( + snapshotIds, + repositoryData, + false, + remoteStoreLockManagerFactory, + writeMetaAndComputeDeletesStep + ); writeMetaAndComputeDeletesStep.whenComplete( - deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, deleteResults, afterCleanupsListener), + deleteResults -> asyncCleanupUnlinkedShardLevelBlobs( + repositoryData, + snapshotIds, + deleteResults, + remoteStoreLockManagerFactory, + afterCleanupsListener + ), afterCleanupsListener::onFailure ); }, listener::onFailure)); @@ -995,15 +1047,24 @@ private void cleanupUnlinkedRootAndIndicesBlobs( Map foundIndices, Map rootBlobs, RepositoryData updatedRepoData, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(listener, ignored -> null)); + cleanupStaleBlobs( + deletedSnapshots, + foundIndices, + rootBlobs, + updatedRepoData, + remoteStoreLockManagerFactory, + ActionListener.map(listener, ignored -> null) + ); } private void asyncCleanupUnlinkedShardLevelBlobs( RepositoryData oldRepositoryData, Collection snapshotIds, Collection deleteResults, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { final List filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults); @@ -1027,7 +1088,7 @@ private void asyncCleanupUnlinkedShardLevelBlobs( // Start as many workers as fit into the snapshot pool at once at the most final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size()); for (int i = 0; i < workers; ++i) { - executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener); + executeStaleShardDelete(staleFilesToDeleteInBatch, remoteStoreLockManagerFactory, groupedListener); } } catch (Exception e) { @@ -1040,12 +1101,49 @@ private void asyncCleanupUnlinkedShardLevelBlobs( } } - private void executeStaleShardDelete(BlockingQueue> staleFilesToDeleteInBatch, GroupedActionListener listener) - throws InterruptedException { + // When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective + // shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs. + private void executeStaleShardDelete( + BlockingQueue> staleFilesToDeleteInBatch, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + GroupedActionListener listener + ) throws InterruptedException { List filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); if (filesToDelete != null) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { try { + if (remoteStoreLockManagerFactory != null) { + for (String fileToDelete : filesToDelete) { + if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { + String[] fileToDeletePath = fileToDelete.split("/"); + String indexId = fileToDeletePath[1]; + String shardId = fileToDeletePath[2]; + String shallowSnapBlob = fileToDeletePath[3]; + String snapshotUUID = shallowSnapBlob.substring( + SHALLOW_SNAPSHOT_PREFIX.length(), + shallowSnapBlob.length() - ".dat".length() + ); + BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); + RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( + shardContainer, + snapshotUUID, + namedXContentRegistry + ); + String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); + String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); + // Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while + // releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during + // next delete operation for releasing this lock file + RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory + .newLockManager(remoteStoreRepoForIndex, indexUUID, shardId); + remoteStoreMetadataLockManager.release( + FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build() + ); + } + } + } + // Deleting the shard blobs deleteFromContainer(blobContainer(), filesToDelete); l.onResponse(null); } catch (Exception e) { @@ -1059,7 +1157,7 @@ private void executeStaleShardDelete(BlockingQueue> staleFilesToDel ); l.onFailure(e); } - executeStaleShardDelete(staleFilesToDeleteInBatch, listener); + executeStaleShardDelete(staleFilesToDeleteInBatch, remoteStoreLockManagerFactory, listener); })); } } @@ -1069,6 +1167,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes( Collection snapshotIds, RepositoryData oldRepositoryData, boolean useUUIDs, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener> onAllShardsCompleted ) { @@ -1141,17 +1240,30 @@ protected void doRun() throws Exception { final Set blobs = shardContainer.listBlobs().keySet(); final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots; final long newGen; - if (useUUIDs) { - newGen = -1L; - blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots( - blobs, - shardContainer, - oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId) - ).v1(); + + // Index-N file would be present if snapshots other than shallow snapshots are present for this shard + if (blobs.stream() + .filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)) + .collect(Collectors.toSet()) + .size() > 0) { + if (useUUIDs) { + newGen = -1L; + blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots( + blobs, + shardContainer, + oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId) + ).v1(); + } else { + Tuple tuple = buildBlobStoreIndexShardSnapshots( + blobs, + shardContainer + ); + newGen = tuple.v2() + 1; + blobStoreIndexShardSnapshots = tuple.v1(); + } } else { - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); - newGen = tuple.v2() + 1; - blobStoreIndexShardSnapshots = tuple.v1(); + newGen = -1L; + blobStoreIndexShardSnapshots = BlobStoreIndexShardSnapshots.EMPTY; } allShardsListener.onResponse( deleteFromShardSnapshotMeta( @@ -1162,7 +1274,8 @@ protected void doRun() throws Exception { shardContainer, blobs, blobStoreIndexShardSnapshots, - newGen + newGen, + remoteStoreLockManagerFactory ) ); } @@ -1213,20 +1326,23 @@ private List resolveFilesToDelete( /** * Cleans up stale blobs directly under the repository root as well as all indices paths that aren't referenced by any existing * snapshots. This method is only to be called directly after a new {@link RepositoryData} was written to the repository and with - * parameters {@code foundIndices}, {@code rootBlobs} + * parameters {@code foundIndices}, {@code rootBlobs}. If remoteStoreLockManagerFactory is not null, remote store lock files are + * released when deleting the respective shallow-snap-UUID blobs. * - * @param deletedSnapshots if this method is called as part of a delete operation, the snapshot ids just deleted or empty if called as - * part of a repository cleanup - * @param foundIndices all indices blob containers found in the repository before {@code newRepoData} was written - * @param rootBlobs all blobs found directly under the repository root - * @param newRepoData new repository data that was just written - * @param listener listener to invoke with the combined {@link DeleteResult} of all blobs removed in this operation + * @param deletedSnapshots if this method is called as part of a delete operation, the snapshot ids just deleted or empty if called as + * part of a repository cleanup + * @param foundIndices all indices blob containers found in the repository before {@code newRepoData} was written + * @param rootBlobs all blobs found directly under the repository root + * @param newRepoData new repository data that was just written + * @param remoteStoreLockManagerFactory RemoteStoreLockManagerFactory to be used for cleaning up remote store lock files. + * @param listener listener to invoke with the combined {@link DeleteResult} of all blobs removed in this operation */ private void cleanupStaleBlobs( Collection deletedSnapshots, Map foundIndices, Map rootBlobs, RepositoryData newRepoData, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { final GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> { @@ -1252,24 +1368,31 @@ private void cleanupStaleBlobs( if (foundIndices.keySet().equals(survivingIndexIds)) { groupedListener.onResponse(DeleteResult.ZERO); } else { - cleanupStaleIndices(foundIndices, survivingIndexIds, groupedListener); + cleanupStaleIndices(foundIndices, survivingIndexIds, remoteStoreLockManagerFactory, groupedListener); } } /** * Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the - * repository. + * repository. If remoteStoreLockManagerFactory is not null, remote store lock files are released when deleting the respective + * shallow-snap-UUID blobs. * TODO: Add shard level cleanups * TODO: Add unreferenced index metadata cleanup *
    *
  • Deleting stale indices {@link #cleanupStaleIndices}
  • *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • *
- * @param repositoryStateId Current repository state id - * @param repositoryMetaVersion version of the updated repository metadata to write - * @param listener Listener to complete when done + * @param repositoryStateId Current repository state id + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param remoteStoreLockManagerFactory RemoteStoreLockManagerFactory to be used for cleaning up remote store lock files. + * @param listener Listener to complete when done */ - public void cleanup(long repositoryStateId, Version repositoryMetaVersion, ActionListener listener) { + public void cleanup( + long repositoryStateId, + Version repositoryMetaVersion, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, + ActionListener listener + ) { try { if (isReadOnly()) { throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"); @@ -1299,6 +1422,7 @@ public void cleanup(long repositoryStateId, Version repositoryMetaVersion, Actio foundIndices, rootBlobs, repositoryData, + remoteStoreLockManagerFactory, ActionListener.map(listener, RepositoryCleanupResult::new) ), listener::onFailure @@ -1390,6 +1514,7 @@ private List cleanupStaleRootFiles( private void cleanupStaleIndices( Map foundIndices, Set survivingIndexIds, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, GroupedActionListener listener ) { final GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> { @@ -1414,7 +1539,7 @@ private void cleanupStaleIndices( foundIndices.size() - survivingIndexIds.size() ); for (int i = 0; i < workers; ++i) { - executeOneStaleIndexDelete(staleIndicesToDelete, groupedListener); + executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, groupedListener); } } catch (Exception e) { // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. @@ -1427,6 +1552,7 @@ private void cleanupStaleIndices( private void executeOneStaleIndexDelete( BlockingQueue> staleIndicesToDelete, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, GroupedActionListener listener ) throws InterruptedException { Map.Entry indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS); @@ -1436,6 +1562,37 @@ private void executeOneStaleIndexDelete( DeleteResult deleteResult = DeleteResult.ZERO; try { logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + if (remoteStoreLockManagerFactory != null) { + Map shardBlobs = indexEntry.getValue().children(); + if (!shardBlobs.isEmpty()) { + for (Map.Entry shardBlob : shardBlobs.entrySet()) { + Map shardLevelBlobs = shardBlob.getValue().listBlobs(); + for (Map.Entry shardLevelBlob : shardLevelBlobs.entrySet()) { + String blob = shardLevelBlob.getKey(); + String snapshotUUID = blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()); + if (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) && blob.endsWith(".dat")) { + RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( + shardBlob.getValue(), + snapshotUUID, + namedXContentRegistry + ); + String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); + String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); + // Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure + // while releasing the lock file, we would still have the corresponding shallow-snap-UUID file + // and that would be used during next delete operation for releasing this stale lock file + RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory + .newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey()); + remoteStoreMetadataLockManager.release( + FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build() + ); + } + } + } + } + } + // Deleting the index folder deleteResult = indexEntry.getValue().delete(); logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); } catch (IOException e) { @@ -1453,7 +1610,7 @@ private void executeOneStaleIndexDelete( logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e); } - executeOneStaleIndexDelete(staleIndicesToDelete, listener); + executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener); return deleteResult; })); } @@ -3062,7 +3219,8 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( BlobContainer shardContainer, Set blobs, BlobStoreIndexShardSnapshots snapshots, - long indexGeneration + long indexGeneration, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory ) { // Build a list of snapshots that should be preserved List newSnapshotsList = new ArrayList<>(); @@ -3074,23 +3232,37 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( } String writtenGeneration = null; try { - if (newSnapshotsList.isEmpty()) { + // Using survivingSnapshots instead of newSnapshotsList as shallow snapshots can be present which won't be part of + // newSnapshotsList + if (survivingSnapshots.isEmpty()) { + // No shallow copy or full copy snapshot is surviving. return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, ShardGenerations.DELETED_SHARD_GEN, blobs); } else { - final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); - if (indexGeneration < 0L) { - writtenGeneration = UUIDs.randomBase64UUID(); - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compressor); + final BlobStoreIndexShardSnapshots updatedSnapshots; + // If we have surviving non shallow snapshots, update index- file. + if (newSnapshotsList.size() > 0) { + // Some full copy snapshots are surviving. + updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); + if (indexGeneration < 0L) { + writtenGeneration = UUIDs.randomBase64UUID(); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compressor); + } else { + writtenGeneration = String.valueOf(indexGeneration); + writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots); + } } else { - writtenGeneration = String.valueOf(indexGeneration); - writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots); + // Some shallow copy snapshots are surviving. In this case, since no full copy snapshots are present, we use + // EMPTY BlobStoreIndexShardSnapshots for updatedSnapshots which is used in unusedBlobs to compute stale files, + // and use DELETED_SHARD_GEN since index-N file would not be present anymore. + updatedSnapshots = BlobStoreIndexShardSnapshots.EMPTY; + writtenGeneration = ShardGenerations.DELETED_SHARD_GEN; } final Set survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet()); return new ShardSnapshotMetaDeleteResult( indexId, snapshotShardId, writtenGeneration, - unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots) + unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots, remoteStoreLockManagerFactory) ); } } catch (IOException e) { @@ -3124,11 +3296,13 @@ private void writeShardIndexBlobAtomic( } // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all - // temporary blobs + // temporary blobs. If remoteStoreLockManagerFactory is non-null, the shallow-snap- files that do not belong to any of the + // surviving snapshots are also added for cleanup. private static List unusedBlobs( Set blobs, Set survivingSnapshotUUIDs, - BlobStoreIndexShardSnapshots updatedSnapshots + BlobStoreIndexShardSnapshots updatedSnapshots, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory ) { return blobs.stream() .filter( @@ -3138,6 +3312,13 @@ private static List unusedBlobs( && survivingSnapshotUUIDs.contains( blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()) ) == false) + || (remoteStoreLockManagerFactory != null + ? (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) + && blob.endsWith(".dat") + && survivingSnapshotUUIDs.contains( + blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()) + ) == false) + : false) || (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null) || FsBlobContainer.isTempBlobName(blob) ) diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 091143d30a983..51b153ecf76a5 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -2798,16 +2798,36 @@ private void deleteSnapshotsFromRepository( assert currentlyFinalizing.contains(deleteEntry.repository()); final List snapshotIds = deleteEntry.getSnapshots(); assert deleteEntry.state() == SnapshotDeletionsInProgress.State.STARTED : "incorrect state for entry [" + deleteEntry + "]"; - repositoriesService.repository(deleteEntry.repository()) - .deleteSnapshots( + final Repository repository = repositoriesService.repository(deleteEntry.repository()); + + // TODO: Relying on repository flag to decide delete flow may lead to shallow snapshot blobs not being taken up for cleanup + // when the repository currently have the flag disabled and we try to delete the shallow snapshots taken prior to disabling + // the flag. This can be improved by having the info whether there ever were any shallow snapshot present in this repository + // or not in RepositoryData. + // SEE https://github.com/opensearch-project/OpenSearch/issues/8610 + final boolean cleanupRemoteStoreLockFiles = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings()); + if (cleanupRemoteStoreLockFiles) { + repository.deleteSnapshotsAndReleaseLockFiles( snapshotIds, repositoryData.getGenId(), minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds), + remoteStoreLockManagerFactory, ActionListener.wrap(updatedRepoData -> { logger.info("snapshots {} deleted", snapshotIds); removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData); }, ex -> removeSnapshotDeletionFromClusterState(deleteEntry, ex, repositoryData)) ); + } else { + repository.deleteSnapshots( + snapshotIds, + repositoryData.getGenId(), + minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds), + ActionListener.wrap(updatedRepoData -> { + logger.info("snapshots {} deleted", snapshotIds); + removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData); + }, ex -> removeSnapshotDeletionFromClusterState(deleteEntry, ex, repositoryData)) + ); + } } }