Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snapshot Interop] Add Logic in Lock Manager to cleanup stale data po… #8472

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME));
}

public int getFileCount(Path path) throws Exception {
public static int getFileCount(Path path) throws Exception {
final AtomicInteger filesExisting = new AtomicInteger(0);
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,28 @@

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.is;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -240,12 +246,13 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
createRepository(REMOTE_REPO_NAME, "fs", remoteStoreRepoPath);

final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
Expand Down Expand Up @@ -289,6 +296,71 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME).length == 0);
}

public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME));
internalCluster().startDataOnlyNode();
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));

final Path remoteStoreRepoPath = randomRepoPath();
createRepository(REMOTE_REPO_NAME, "fs", remoteStoreRepoPath);

final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

logger.info("--> create two remote index shallow snapshots");
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, 2);

String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME);
assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles);

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 1");
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(0))
.get();
assertAcked(deleteSnapshotResponse);

lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME, indexUUID);
assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles);

logger.info("--> delete snapshot 2");
deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(1))
.get();
assertAcked(deleteSnapshotResponse);

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
// Delete is async. Give time for it
assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(indexPath), comparesEqualTo(0));
} catch (Exception e) {}
}, 30, TimeUnit.SECONDS);
}

private List<String> createNSnapshots(String repoName, int count) {
final List<String> snapshotNames = new ArrayList<>(count);
final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,29 +837,36 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}
}

public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {}));
}

/**
* Delete stale segment and metadata files asynchronously.
* This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner.
* @param lastNMetadataFilesToKeep number of metadata files to keep
*/
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener) {
if (canDeleteStaleCommits.compareAndSet(true, false)) {
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
deleteStaleSegments(lastNMetadataFilesToKeep);
listener.onResponse(null);
} catch (Exception e) {
logger.info(
logger.error(
"Exception while deleting stale commits from remote segment store, will retry delete post next commit",
e
);
listener.onFailure(e);
} finally {
canDeleteStaleCommits.set(true);
}
});
} catch (Exception e) {
logger.info("Exception occurred while scheduling deleteStaleCommits", e);
logger.error("Exception occurred while scheduling deleteStaleCommits", e);
canDeleteStaleCommits.set(true);
listener.onFailure(e);
}
}
}
Expand Down Expand Up @@ -891,7 +898,6 @@ private boolean deleteIfEmpty() throws IOException {
}

public void close() throws IOException {
deleteStaleSegmentsAsync(0);
deleteIfEmpty();
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -59,7 +59,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh

RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data");
RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata");
RemoteStoreMetadataLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
repositoriesService.get(),
repositoryName,
indexUUID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
this.repositoriesService = repositoriesService;
}

public RemoteStoreMetadataLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId);
}

Expand All @@ -58,6 +58,12 @@
}
}

// TODO: remove this once we add poller in place to trigger remote store cleanup
// see: https://github.com/opensearch-project/OpenSearch/issues/8469
public Supplier<RepositoriesService> getRepositoriesService() {
return repositoriesService;

Check warning on line 64 in server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java#L64

Added line #L64 was not covered by tests
}

private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory(
Repository repository,
BlobPath commonBlobPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@
import org.opensearch.index.snapshots.blobstore.RateLimitingInputStream;
import org.opensearch.index.snapshots.blobstore.SlicedInputStream;
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -616,7 +617,7 @@
RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = (RemoteStoreShardShallowCopySnapshot) indexShardSnapshot;
String indexUUID = remStoreBasedShardMetadata.getIndexUUID();
String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository();
RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager(
RemoteStoreLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager(

Check warning on line 620 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L620

Added line #L620 was not covered by tests
remoteStoreRepository,
indexUUID,
String.valueOf(shardId.shardId())
Expand Down Expand Up @@ -1072,11 +1073,24 @@
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardId);
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(

Check warning on line 1076 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1076

Added line #L1076 was not covered by tests
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),

Check warning on line 1090 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1089-L1090

Added lines #L1089 - L1090 were not covered by tests
threadPool
).newDirectory(remoteStoreRepoForIndex, indexUUID, shardId).close();

Check warning on line 1092 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1092

Added line #L1092 was not covered by tests
}
}
}
}
Expand Down Expand Up @@ -1487,6 +1501,15 @@
}
}

private static boolean isIndexPresent(ClusterService clusterService, String indexUUID) {
for (final IndexMetadata indexMetadata : clusterService.state().metadata().getIndices().values()) {
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
if (indexUUID.equals(indexMetadata.getIndexUUID())) {
return true;

Check warning on line 1507 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1507

Added line #L1507 was not covered by tests
}
}
return false;

Check warning on line 1510 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1509-L1510

Added lines #L1509 - L1510 were not covered by tests
}

private void executeOneStaleIndexDelete(
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
Expand Down Expand Up @@ -1519,11 +1542,21 @@
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory

Check warning on line 1545 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1545

Added line #L1545 was not covered by tests
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey());
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),

Check warning on line 1556 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1555-L1556

Added lines #L1555 - L1556 were not covered by tests
threadPool
).newDirectory(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey()).close();

Check warning on line 1558 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1558

Added line #L1558 was not covered by tests
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ public void testRetrieveSnapshots() throws Exception {
assertThat(snapshotIds, equalTo(originalSnapshots));
}

// Validate Scenario remoteStoreShallowCopy Snapshot -> remoteStoreShallowCopy Snapshot
// -> remoteStoreShallowCopy Snapshot -> normal snapshot
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
final BlobStoreRepository repository = setupRepo();
final long pendingGeneration = repository.metadata.pendingGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String rem
.prepareGetSettings(remoteStoreIndex)
.get()
.getSetting(remoteStoreIndex, IndexMetadata.SETTING_INDEX_UUID);
return getLockFilesInRemoteStore(remoteStoreIndex, remoteStoreRepositoryName, indexUUID);
}

protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String remoteStoreRepositoryName, String indexUUID)
throws IOException {
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(remoteStoreRepositoryName);
BlobPath shardLevelBlobPath = remoteStoreRepository.basePath().add(indexUUID).add("0").add("segments").add("lock_files");
Expand Down
Loading