Skip to content

Commit

Permalink
[Remote Store] Avoid repeated delete calls for the stale segment files (
Browse files Browse the repository at this point in the history
opensearch-project#11532)

* Avoid repeated delete calls for the stale segment files

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Address PR comments

Signed-off-by: Sachin Kale <kalsac@amazon.com>

---------

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
2 people authored and shiv0408 committed Apr 25, 2024
1 parent aca2cc6 commit e183f11
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -781,38 +780,41 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
segmentMetadataMap.values().stream().map(metadata -> metadata.uploadedFilename).collect(Collectors.toSet())
);
}
Set<String> deletedSegmentFiles = new HashSet<>();
for (String metadataFile : metadataFilesToBeDeleted) {
Map<String, UploadedSegmentMetadata> staleSegmentFilesMetadataMap = readMetadataFile(metadataFile).getMetadata();
Set<String> staleSegmentRemoteFilenames = staleSegmentFilesMetadataMap.values()
.stream()
.map(metadata -> metadata.uploadedFilename)
.collect(Collectors.toSet());
AtomicBoolean deletionSuccessful = new AtomicBoolean(true);
List<String> nonActiveDeletedSegmentFiles = new ArrayList<>();
staleSegmentRemoteFilenames.stream().filter(file -> !activeSegmentRemoteFilenames.contains(file)).forEach(file -> {
try {
remoteDataDirectory.deleteFile(file);
nonActiveDeletedSegmentFiles.add(file);
if (!activeSegmentFilesMetadataMap.containsKey(getLocalSegmentFilename(file))) {
segmentsUploadedToRemoteStore.remove(getLocalSegmentFilename(file));
staleSegmentRemoteFilenames.stream()
.filter(file -> activeSegmentRemoteFilenames.contains(file) == false)
.filter(file -> deletedSegmentFiles.contains(file) == false)
.forEach(file -> {
try {
remoteDataDirectory.deleteFile(file);
deletedSegmentFiles.add(file);
if (!activeSegmentFilesMetadataMap.containsKey(getLocalSegmentFilename(file))) {
segmentsUploadedToRemoteStore.remove(getLocalSegmentFilename(file));
}
} catch (NoSuchFileException e) {
logger.info("Segment file {} corresponding to metadata file {} does not exist in remote", file, metadataFile);
} catch (IOException e) {
deletionSuccessful.set(false);
logger.warn(
"Exception while deleting segment file {} corresponding to metadata file {}. Deletion will be re-tried",
file,
metadataFile
);
}
} catch (NoSuchFileException e) {
logger.info("Segment file {} corresponding to metadata file {} does not exist in remote", file, metadataFile);
} catch (IOException e) {
deletionSuccessful.set(false);
logger.info(
"Exception while deleting segment file {} corresponding to metadata file {}. Deletion will be re-tried",
file,
metadataFile
);
}
});
logger.debug("nonActiveDeletedSegmentFiles={}", nonActiveDeletedSegmentFiles);
});
if (deletionSuccessful.get()) {
logger.debug("Deleting stale metadata file {} from remote segment store", metadataFile);
remoteMetadataDirectory.deleteFile(metadataFile);
}
}
logger.debug("deletedSegmentFiles={}", deletedSegmentFiles);
}

public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -122,6 +123,14 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase {
1,
"node-1"
);
private final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
10,
36,
34,
1,
1,
"node-1"
);

@Before
public void setup() throws IOException {
Expand Down Expand Up @@ -979,6 +988,51 @@ public void testDeleteStaleCommitsActualDelete() throws Exception {
verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
}

public void testDeleteStaleCommitsDeleteDedup() throws Exception {
Map<String, Map<String, String>> metadataFilenameContentMapping = new HashMap<>(populateMetadata());
metadataFilenameContentMapping.put(metadataFilename4, metadataFilenameContentMapping.get(metadataFilename3));

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(new ArrayList<>(List.of(metadataFilename, metadataFilename2, metadataFilename3, metadataFilename4)));

when(remoteMetadataDirectory.getBlobStream(metadataFilename4)).thenAnswer(
I -> createMetadataFileBytes(
metadataFilenameContentMapping.get(metadataFilename4),
indexShard.getLatestReplicationCheckpoint(),
segmentInfos
)
);

remoteSegmentStoreDirectory.init();

// popluateMetadata() adds stub to return 4 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 2 metadata files will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);

Set<String> staleSegmentFiles = new HashSet<>();
for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
staleSegmentFiles.add(metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]);
}
for (String metadata : metadataFilenameContentMapping.get(metadataFilename4).values()) {
staleSegmentFiles.add(metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]);
}
staleSegmentFiles.forEach(file -> {
try {
// Even with the same files in 2 stale metadata files, delete should be called only once.
verify(remoteDataDirectory, times(1)).deleteFile(file);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
verify(remoteMetadataDirectory).deleteFile(metadataFilename4);
}

public void testDeleteStaleCommitsActualDeleteIOException() throws Exception {
Map<String, Map<String, String>> metadataFilenameContentMapping = populateMetadata();
remoteSegmentStoreDirectory.init();
Expand Down

0 comments on commit e183f11

Please sign in to comment.