Skip to content

Commit

Permalink
Consider primary term while deleting generations
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
  • Loading branch information
sachinpkale committed Sep 18, 2024
1 parent 024a004 commit 2f8e2af
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots(
List<Path> segmentsPostDeletionOfSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList());
assertTrue(segmentsPostDeletionOfSnapshot1.size() < segmentsPostSnapshot2.size());
}, 60, TimeUnit.SECONDS);
// To uncomment following, we need to handle deletion of generations in translog cleanup flow
// List<Path> translogPostDeletionOfSnapshot1 = Files.list(translogPath).collect(Collectors.toList());
// Delete is async. Give time for it
// assertBusy(() -> assertEquals(translogPostSnapshot2.size() - translogPostSnapshot1.size(),
// translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS);

assertBusy(() -> {
List<Path> translogPostDeletionOfSnapshot1 = Files.list(translogPath).collect(Collectors.toList());
assertTrue(translogPostDeletionOfSnapshot1.size() < translogPostSnapshot2.size());
}, 60, TimeUnit.SECONDS);
}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -199,20 +200,23 @@ public void onResponse(List<BlobMetadata> blobMetadata) {

logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
Map<Long, Set<Long>> primaryTermGenerationsToBeDeletedMap = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted ? Long.MAX_VALUE : minRemoteGenReferenced
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
maxDeletedGenerationOnRemote = generationsToBeDeleted.stream().max(Long::compareTo).get();
logger.debug(() -> "generationsToBeDeleted = " + primaryTermGenerationsToBeDeletedMap);
if (primaryTermGenerationsToBeDeletedMap.isEmpty() == false) {
Optional<Long> maxGeneration = primaryTermGenerationsToBeDeletedMap.values()
.stream()
.flatMap(Collection::stream)
.max(Long::compare);
maxGeneration.ifPresent(aLong -> maxDeletedGenerationOnRemote = aLong);

// Delete stale generations
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
primaryTermGenerationsToBeDeletedMap,
remoteGenerationDeletionPermits::release
);
} else {
Expand Down Expand Up @@ -247,32 +251,115 @@ public void onFailure(Exception e) {
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}

protected Map<Long, Set<Long>> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
long minRemoteGenReferenced
) throws IOException {
return getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
translogTransferManager,
oldFormatMetadataFileGenerationMap,
oldFormatMetadataFilePrimaryTermMap,
minRemoteGenReferenced
);
}

// Visible for testing
protected Set<Long> getGenerationsToBeDeleted(
protected static Map<Long, Set<Long>> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
long minRemoteGenReferenced
) throws IOException {
Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
Map<Long, Set<String>> generationsFromMetadataFilesToBeDeleted = new HashMap<>();
for (String mdFile : metadataFilesToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
generationsFromMetadataFilesToBeDeleted.addAll(
LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(
mdFile,
translogTransferManager,
oldFormatMetadataFileGenerationMap
);
for (long generation : LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())) {
if (generationsFromMetadataFilesToBeDeleted.containsKey(generation) == false) {
generationsFromMetadataFilesToBeDeleted.put(generation, new HashSet<>());
}
generationsFromMetadataFilesToBeDeleted.get(generation).add(mdFile);
}
}

Map<String, Tuple<Long, Long>> metadataFileNotToBeDeletedGenerationMap = new HashMap<>();
for (String mdFile : metadataFilesNotToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(
mdFile,
translogTransferManager,
oldFormatMetadataFileGenerationMap
);
metadataFileNotToBeDeletedGenerationMap.put(mdFile, minMaxGen);
List<Long> generations = LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList());
for (Long generation : generations) {
if (generationsFromMetadataFilesToBeDeleted.containsKey(generation)) {
generationsFromMetadataFilesToBeDeleted.get(generation).add(mdFile);
}
}
}

Map<String, Tuple<Long, Long>> metadataFileNotToBeDeletedGenerationMap = getGenerationForMetadataFiles(metadataFilesNotToBeDeleted);
TreeSet<Tuple<Long, Long>> pinnedGenerations = getOrderedPinnedMetadataGenerations(metadataFileNotToBeDeletedGenerationMap);
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap = new HashMap<>();
for (long generation : generationsFromMetadataFilesToBeDeleted.keySet()) {
// Check if the generation is not referred by metadata file matching pinned timestamps
// The check with minRemoteGenReferenced is redundant but kept as to make sure we don't delete generations
// that are not persisted in remote segment store yet.
if (generation < minRemoteGenReferenced && isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
generationsToBeDeletedToPrimaryTermRangeMap.put(
generation,
getPrimaryTermRange(
generationsFromMetadataFilesToBeDeleted.get(generation),
translogTransferManager,
oldFormatMetadataFilePrimaryTermMap
)
);
}
}
return getPrimaryTermToGenerationsMap(generationsToBeDeletedToPrimaryTermRangeMap);
}

protected static Map<Long, Set<Long>> getPrimaryTermToGenerationsMap(Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap) {
Map<Long, Set<Long>> primaryTermToGenerationsMap = new HashMap<>();
for (Map.Entry<Long, Set<Long>> entry : generationsToBeDeletedToPrimaryTermRangeMap.entrySet()) {
for (Long primaryTerm : entry.getValue()) {
if (primaryTermToGenerationsMap.containsKey(primaryTerm) == false) {
primaryTermToGenerationsMap.put(primaryTerm, new HashSet<>());
}
primaryTermToGenerationsMap.get(primaryTerm).add(entry.getKey());
}
}
return generationsToBeDeleted;
return primaryTermToGenerationsMap;
}

protected static Set<Long> getPrimaryTermRange(
Set<String> metadataFiles,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
) throws IOException {
Tuple<Long, Long> primaryTermRange = new Tuple<>(Long.MIN_VALUE, Long.MAX_VALUE);
for (String metadataFile : metadataFiles) {
Tuple<Long, Long> primaryTermRangeForMdFile = getMinMaxPrimaryTermFromMetadataFile(
metadataFile,
translogTransferManager,
oldFormatMetadataFilePrimaryTermMap
);
primaryTermRange = new Tuple<>(
Math.max(primaryTermRange.v1(), primaryTermRangeForMdFile.v1()),
Math.min(primaryTermRange.v2(), primaryTermRangeForMdFile.v2())
);
if (primaryTermRange.v1().equals(primaryTermRange.v2())) {
break;
}
}
return LongStream.rangeClosed(primaryTermRange.v1(), primaryTermRange.v2()).boxed().collect(Collectors.toSet());
}

protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles, boolean indexDeleted) {
Expand Down Expand Up @@ -348,7 +435,7 @@ protected static List<String> getMetadataFilesToBeDeleted(
}

// Visible for testing
protected boolean isGenerationPinned(long generation, TreeSet<Tuple<Long, Long>> pinnedGenerations) {
protected static boolean isGenerationPinned(long generation, TreeSet<Tuple<Long, Long>> pinnedGenerations) {
Tuple<Long, Long> ceilingGenerationRange = pinnedGenerations.ceiling(new Tuple<>(generation, generation));
if (ceilingGenerationRange != null && generation >= ceilingGenerationRange.v1() && generation <= ceilingGenerationRange.v2()) {
return true;
Expand All @@ -360,7 +447,9 @@ protected boolean isGenerationPinned(long generation, TreeSet<Tuple<Long, Long>>
return false;
}

private TreeSet<Tuple<Long, Long>> getOrderedPinnedMetadataGenerations(Map<String, Tuple<Long, Long>> metadataFileGenerationMap) {
private static TreeSet<Tuple<Long, Long>> getOrderedPinnedMetadataGenerations(
Map<String, Tuple<Long, Long>> metadataFileGenerationMap
) {
TreeSet<Tuple<Long, Long>> pinnedGenerations = new TreeSet<>((o1, o2) -> {
if (Objects.equals(o1.v1(), o2.v1()) == false) {
return o1.v1().compareTo(o2.v1());
Expand All @@ -373,18 +462,10 @@ private TreeSet<Tuple<Long, Long>> getOrderedPinnedMetadataGenerations(Map<Strin
}

// Visible for testing
protected Map<String, Tuple<Long, Long>> getGenerationForMetadataFiles(List<String> metadataFiles) throws IOException {
Map<String, Tuple<Long, Long>> metadataFileGenerationMap = new HashMap<>();
for (String metadataFile : metadataFiles) {
metadataFileGenerationMap.put(metadataFile, getMinMaxTranslogGenerationFromMetadataFile(metadataFile, translogTransferManager));
}
return metadataFileGenerationMap;
}

// Visible for testing
protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
protected static Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
String metadataFile,
TranslogTransferManager translogTransferManager
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap
) throws IOException {
Tuple<Long, Long> minMaxGenerationFromFileName = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(metadataFile);
if (minMaxGenerationFromFileName != null) {
Expand Down Expand Up @@ -541,6 +622,16 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

Map<Long, Set<Long>> primaryTermGenerationsToBeDeletedMap = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
translogTransferManager,
new HashMap<>(),
new HashMap<>(),
Long.MAX_VALUE
);
translogTransferManager.deleteGenerationAsync(primaryTermGenerationsToBeDeletedMap, () -> {});

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -39,6 +40,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -496,6 +498,12 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = getTranslogFilesFromGenerations(generations);
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
}

private List<String> getTranslogFilesFromGenerations(Set<Long> generations) {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
Expand All @@ -507,8 +515,47 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
translogFiles.add(translogFileName);
}
});
return translogFiles;
}

public void deleteGenerationAsync(Map<Long, Set<Long>> primaryTermToGenerationsMap, Runnable onCompletion) {
GroupedActionListener<Void> groupedActionListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<Void> unused) {
logger.trace(() -> "Deleted translogs for primaryTermToGenerationsMap=" + primaryTermToGenerationsMap);
onCompletion.run();
}

@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTermToGenerationsMap={}",
primaryTermToGenerationsMap
),
e
);
}
}, primaryTermToGenerationsMap.size());

// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
deleteTranslogFilesAsync(primaryTermToGenerationsMap, groupedActionListener);
}

private void deleteTranslogFilesAsync(Map<Long, Set<Long>> primaryTermToGenerationsMap, ActionListener<Void> actionListener) {
for (Long primaryTerm : primaryTermToGenerationsMap.keySet()) {
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
getTranslogFilesFromGenerations(primaryTermToGenerationsMap.get(primaryTerm)),
actionListener
);
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ private long getMinPrimaryTermReferred() {
if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) {
return -1;
}
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get().values().stream().map(Long::parseLong).min(Long::compareTo);
if (minPrimaryTerm.isPresent()) {
return minPrimaryTerm.get();
} else {
Expand Down
Loading

0 comments on commit 2f8e2af

Please sign in to comment.