Skip to content

Commit

Permalink
Consider primary term while deleting generations
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
  • Loading branch information
sachinpkale committed Sep 18, 2024
1 parent 024a004 commit 18e7975
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -199,20 +200,23 @@ public void onResponse(List<BlobMetadata> blobMetadata) {

logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
Map<Long, Set<Long>> primaryTermGenerationsToBeDeletedMap = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted ? Long.MAX_VALUE : minRemoteGenReferenced
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
maxDeletedGenerationOnRemote = generationsToBeDeleted.stream().max(Long::compareTo).get();
logger.debug(() -> "generationsToBeDeleted = " + primaryTermGenerationsToBeDeletedMap);
if (primaryTermGenerationsToBeDeletedMap.isEmpty() == false) {
Optional<Long> maxGeneration = primaryTermGenerationsToBeDeletedMap.values()
.stream()
.flatMap(Collection::stream)
.max(Long::compare);
maxGeneration.ifPresent(aLong -> maxDeletedGenerationOnRemote = aLong);

// Delete stale generations
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
primaryTermGenerationsToBeDeletedMap,
remoteGenerationDeletionPermits::release
);
} else {
Expand Down Expand Up @@ -248,31 +252,79 @@ public void onFailure(Exception e) {
}

// Visible for testing
protected Set<Long> getGenerationsToBeDeleted(
protected Map<Long, Set<Long>> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
long minRemoteGenReferenced
) throws IOException {
Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
Map<Long, Set<String>> generationsFromMetadataFilesToBeDeleted = new HashMap<>();
for (String mdFile : metadataFilesToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
generationsFromMetadataFilesToBeDeleted.addAll(
LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())
);
for (long generation : LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())) {
if (generationsFromMetadataFilesToBeDeleted.containsKey(generation) == false) {
generationsFromMetadataFilesToBeDeleted.put(generation, new HashSet<>());
}
generationsFromMetadataFilesToBeDeleted.get(generation).add(mdFile);
}
}

for (String mdFile : metadataFilesNotToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
List<Long> generations = LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList());
for (Long generation : generations) {
if (generationsFromMetadataFilesToBeDeleted.containsKey(generation)) {
generationsFromMetadataFilesToBeDeleted.get(generation).add(mdFile);
}
}
}

Map<String, Tuple<Long, Long>> metadataFileNotToBeDeletedGenerationMap = getGenerationForMetadataFiles(metadataFilesNotToBeDeleted);
TreeSet<Tuple<Long, Long>> pinnedGenerations = getOrderedPinnedMetadataGenerations(metadataFileNotToBeDeletedGenerationMap);
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap = new HashMap<>();
for (long generation : generationsFromMetadataFilesToBeDeleted.keySet()) {
// Check if the generation is not referred by metadata file matching pinned timestamps
// The check with minRemoteGenReferenced is redundant but kept as to make sure we don't delete generations
// that are not persisted in remote segment store yet.
if (generation < minRemoteGenReferenced && isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
generationsToBeDeletedToPrimaryTermRangeMap.put(
generation,
getPrimaryTermRange(generationsFromMetadataFilesToBeDeleted.get(generation), translogTransferManager)
);
}
}
return getPrimaryTermToGenerationsMap(generationsToBeDeletedToPrimaryTermRangeMap);
}

protected Map<Long, Set<Long>> getPrimaryTermToGenerationsMap(Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap) {
Map<Long, Set<Long>> primaryTermToGenerationsMap = new HashMap<>();
for (Map.Entry<Long, Set<Long>> entry : generationsToBeDeletedToPrimaryTermRangeMap.entrySet()) {
for (Long primaryTerm : entry.getValue()) {
if (primaryTermToGenerationsMap.containsKey(primaryTerm) == false) {
primaryTermToGenerationsMap.put(primaryTerm, new HashSet<>());
}
primaryTermToGenerationsMap.get(primaryTerm).add(entry.getKey());
}
}
return primaryTermToGenerationsMap;
}

protected Set<Long> getPrimaryTermRange(Set<String> metadataFiles, TranslogTransferManager translogTransferManager) throws IOException {
Tuple<Long, Long> primaryTermRange = new Tuple<>(Long.MIN_VALUE, Long.MAX_VALUE);
for (String metadataFile : metadataFiles) {
Tuple<Long, Long> primaryTermRangeForMdFile = getMinMaxPrimaryTermFromMetadataFile(
metadataFile,
translogTransferManager,
oldFormatMetadataFilePrimaryTermMap
);
primaryTermRange = new Tuple<>(
Math.max(primaryTermRange.v1(), primaryTermRangeForMdFile.v1()),
Math.min(primaryTermRange.v2(), primaryTermRangeForMdFile.v2())
);
if (primaryTermRange.v1().equals(primaryTermRange.v2())) {
break;
}
}
return generationsToBeDeleted;
return LongStream.rangeClosed(primaryTermRange.v1(), primaryTermRange.v2()).boxed().collect(Collectors.toSet());
}

protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles, boolean indexDeleted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -39,6 +40,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -496,6 +498,12 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = getTranslogFilesFromGenerations(generations);
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
}

private List<String> getTranslogFilesFromGenerations(Set<Long> generations) {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
Expand All @@ -507,8 +515,47 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
translogFiles.add(translogFileName);
}
});
return translogFiles;
}

public void deleteGenerationAsync(Map<Long, Set<Long>> primaryTermToGenerationsMap, Runnable onCompletion) {
GroupedActionListener<Void> groupedActionListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<Void> unused) {
logger.trace(() -> "Deleted translogs for primaryTermToGenerationsMap=" + primaryTermToGenerationsMap);
onCompletion.run();
}

@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(

Check warning on line 533 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java#L531-L533

Added lines #L531 - L533 were not covered by tests
"Exception occurred while deleting translog for primaryTermToGenerationsMap={}",
primaryTermToGenerationsMap
),
e
);
}

Check warning on line 539 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java#L539

Added line #L539 was not covered by tests
}, primaryTermToGenerationsMap.size());

// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
deleteTranslogFilesAsync(primaryTermToGenerationsMap, groupedActionListener);
}

private void deleteTranslogFilesAsync(Map<Long, Set<Long>> primaryTermToGenerationsMap, ActionListener<Void> actionListener) {
for (Long primaryTerm : primaryTermToGenerationsMap.keySet()) {
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
getTranslogFilesFromGenerations(primaryTermToGenerationsMap.get(primaryTerm)),
actionListener
);
} catch (Exception e) {
actionListener.onFailure(e);

Check warning on line 556 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java#L555-L556

Added lines #L555 - L556 were not covered by tests
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ private long getMinPrimaryTermReferred() {
if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) {
return -1;
}
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get().values().stream().map(Long::parseLong).min(Long::compareTo);
if (minPrimaryTerm.isPresent()) {
return minPrimaryTerm.get();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.mockito.Mockito;

Expand Down Expand Up @@ -634,70 +632,70 @@ ChannelFactory getChannelFactory() {
}
}

public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() throws IOException {
List<String> metadataFilesNotToBeDeleted = new ArrayList<>();
List<String> metadataFilesToBeDeleted = List.of(
// 4 to 7
"metadata__9223372036854775806__9223372036854775800__9223370311919910398__31__9223372036854775803__1__1",
// 17 to 37
"metadata__9223372036854775806__9223372036854775770__9223370311919910398__31__9223372036854775790__1__1",
// 27 to 42
"metadata__9223372036854775806__9223372036854775765__9223370311919910403__31__9223372036854775780__1__1"
);
Set<Long> generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
Long.MAX_VALUE
);
Set<Long> md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet());
Set<Long> md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet());
Set<Long> md3Generations = LongStream.rangeClosed(27, 42).boxed().collect(Collectors.toSet());

assertTrue(generations.containsAll(md1Generations));
assertTrue(generations.containsAll(md2Generations));
assertTrue(generations.containsAll(md3Generations));

generations.removeAll(md1Generations);
generations.removeAll(md2Generations);
generations.removeAll(md3Generations);
assertTrue(generations.isEmpty());
}

public void testGetGenerationsToBeDeleted() throws IOException {
List<String> metadataFilesNotToBeDeleted = List.of(
// 1 to 4
"metadata__9223372036854775806__9223372036854775803__9223370311919910398__31__9223372036854775806__1__1",
// 26 to 30
"metadata__9223372036854775806__9223372036854775777__9223370311919910398__31__9223372036854775781__1__1",
// 42 to 100
"metadata__9223372036854775806__9223372036854775707__9223370311919910403__31__9223372036854775765__1__1"
);
List<String> metadataFilesToBeDeleted = List.of(
// 4 to 7
"metadata__9223372036854775806__9223372036854775800__9223370311919910398__31__9223372036854775803__1__1",
// 17 to 37
"metadata__9223372036854775806__9223372036854775770__9223370311919910398__31__9223372036854775790__1__1",
// 27 to 42
"metadata__9223372036854775806__9223372036854775765__9223370311919910403__31__9223372036854775780__1__1"
);
Set<Long> generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
Long.MAX_VALUE
);
Set<Long> md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet());
Set<Long> md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet());
Set<Long> md3Generations = LongStream.rangeClosed(31, 41).boxed().collect(Collectors.toSet());

assertTrue(generations.containsAll(md1Generations));
assertTrue(generations.containsAll(md2Generations));
assertTrue(generations.containsAll(md3Generations));

generations.removeAll(md1Generations);
generations.removeAll(md2Generations);
generations.removeAll(md3Generations);
assertTrue(generations.isEmpty());
}
// public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() throws IOException {
// List<String> metadataFilesNotToBeDeleted = new ArrayList<>();
// List<String> metadataFilesToBeDeleted = List.of(
// // 4 to 7
// "metadata__9223372036854775806__9223372036854775800__9223370311919910398__31__9223372036854775803__1__1",
// // 17 to 37
// "metadata__9223372036854775806__9223372036854775770__9223370311919910398__31__9223372036854775790__1__1",
// // 27 to 42
// "metadata__9223372036854775806__9223372036854775765__9223370311919910403__31__9223372036854775780__1__1"
// );
// Set<Long> generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted(
// metadataFilesNotToBeDeleted,
// metadataFilesToBeDeleted,
// Long.MAX_VALUE
// );
// Set<Long> md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet());
// Set<Long> md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet());
// Set<Long> md3Generations = LongStream.rangeClosed(27, 42).boxed().collect(Collectors.toSet());
//
// assertTrue(generations.containsAll(md1Generations));
// assertTrue(generations.containsAll(md2Generations));
// assertTrue(generations.containsAll(md3Generations));
//
// generations.removeAll(md1Generations);
// generations.removeAll(md2Generations);
// generations.removeAll(md3Generations);
// assertTrue(generations.isEmpty());
// }
//
// public void testGetGenerationsToBeDeleted() throws IOException {
// List<String> metadataFilesNotToBeDeleted = List.of(
// // 1 to 4
// "metadata__9223372036854775806__9223372036854775803__9223370311919910398__31__9223372036854775806__1__1",
// // 26 to 30
// "metadata__9223372036854775806__9223372036854775777__9223370311919910398__31__9223372036854775781__1__1",
// // 42 to 100
// "metadata__9223372036854775806__9223372036854775707__9223370311919910403__31__9223372036854775765__1__1"
// );
// List<String> metadataFilesToBeDeleted = List.of(
// // 4 to 7
// "metadata__9223372036854775806__9223372036854775800__9223370311919910398__31__9223372036854775803__1__1",
// // 17 to 37
// "metadata__9223372036854775806__9223372036854775770__9223370311919910398__31__9223372036854775790__1__1",
// // 27 to 42
// "metadata__9223372036854775806__9223372036854775765__9223370311919910403__31__9223372036854775780__1__1"
// );
// Set<Long> generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted(
// metadataFilesNotToBeDeleted,
// metadataFilesToBeDeleted,
// Long.MAX_VALUE
// );
// Set<Long> md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet());
// Set<Long> md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet());
// Set<Long> md3Generations = LongStream.rangeClosed(31, 41).boxed().collect(Collectors.toSet());
//
// assertTrue(generations.containsAll(md1Generations));
// assertTrue(generations.containsAll(md2Generations));
// assertTrue(generations.containsAll(md3Generations));
//
// generations.removeAll(md1Generations);
// generations.removeAll(md2Generations);
// generations.removeAll(md3Generations);
// assertTrue(generations.isEmpty());
// }

public void testGetMetadataFilesToBeDeletedNoExclusion() {
updatePinnedTimstampTask.run();
Expand Down

0 comments on commit 18e7975

Please sign in to comment.