Skip to content

Commit

Permalink
Extract common UUID parsing method
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Feb 15, 2024
1 parent 20aaa8d commit d05e5fa
Showing 1 changed file with 43 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,8 @@ private static boolean isIndexPresent(ClusterService clusterService, String inde
return false;
}



private void executeOneStaleIndexDelete(
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
Expand All @@ -1586,47 +1588,40 @@ private void executeOneStaleIndexDelete(
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
if (remoteStoreLockManagerFactory != null) {
Map<String, BlobContainer> shardBlobs = indexEntry.getValue().children();
if (!shardBlobs.isEmpty()) {
for (Map.Entry<String, BlobContainer> shardBlob : shardBlobs.entrySet()) {
Map<String, BlobMetadata> shardLevelBlobs = shardBlob.getValue().listBlobs();
for (Map.Entry<String, BlobMetadata> shardLevelBlob : shardLevelBlobs.entrySet()) {
String blob = shardLevelBlob.getKey();
if (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) && blob.endsWith(".dat")) {
String snapshotUUID = blob.substring(
SHALLOW_SNAPSHOT_PREFIX.length(),
blob.length() - ".dat".length()
final Map<String, BlobContainer> shardBlobs = indexEntry.getValue().children();
for (Map.Entry<String, BlobContainer> shardBlob : shardBlobs.entrySet()) {
for (String blob : shardBlob.getValue().listBlobs().keySet()) {
final Optional<String> snapshotUUID = extractShallowSnapshotUUID(blob);
if (snapshotUUID.isPresent()) {
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardBlob.getValue(),
snapshotUUID.get(),
namedXContentRegistry
);
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardBlob.getValue(),
snapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey());
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardBlob.getKey()))
).close();
}
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey());
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID.get()).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardBlob.getKey()))
).close();
}
}
}
Expand Down Expand Up @@ -3365,12 +3360,7 @@ private static List<String> unusedBlobs(
blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())
) == false)
|| (remoteStoreLockManagerFactory != null
? (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX)
&& blob.endsWith(".dat")
&& survivingSnapshotUUIDs.contains(
blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())
) == false)
: false)
&& extractShallowSnapshotUUID(blob).map(survivingSnapshotUUIDs::contains).orElse(false))
|| (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|| FsBlobContainer.isTempBlobName(blob)
)
Expand Down Expand Up @@ -3512,6 +3502,13 @@ private static void failStoreIfCorrupted(Store store, Exception e) {
}
}

private static Optional<String> extractShallowSnapshotUUID(String blobName) {
if (blobName.startsWith(SHALLOW_SNAPSHOT_PREFIX)) {
return Optional.of(blobName.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blobName.length() - ".dat".length()));
}
return Optional.empty();
}

/**
* The result of removing a snapshot from a shard folder in the repository.
*/
Expand Down

0 comments on commit d05e5fa

Please sign in to comment.