Skip to content

Commit

Permalink
Integrate translog cleanup with snapshot deletion and fix primary ter…
Browse files Browse the repository at this point in the history
…m deletion logic

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Sep 4, 2024
1 parent e087272 commit 8a29eb1
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
}
}

public Supplier<RepositoriesService> getRepositoriesService() {
return this.repositoriesService;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.collect.Tuple;
Expand All @@ -33,6 +35,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
Expand All @@ -52,10 +57,13 @@
*/
public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {

private static Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslog.class);
private final Logger logger;
private final Map<Long, String> metadataFilePinnedTimestampMap;
// For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads.
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -86,6 +94,7 @@ public RemoteFsTimestampAwareTranslog(
logger = Loggers.getLogger(getClass(), shardId);
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.oldFormatMetadataFileGenerationMap = new HashMap<>();
this.oldFormatMetadataFilePrimaryTermMap = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -165,7 +174,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
return;
}

List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles);
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
logger
);

// If index is not deleted, make sure to keep latest metadata file
if (indexDeleted == false) {
Expand Down Expand Up @@ -209,7 +222,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFiles);
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
}
Expand Down Expand Up @@ -259,8 +272,16 @@ protected Set<Long> getGenerationsToBeDeleted(
return generationsToBeDeleted;
}

// Visible for testing
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger);
}

// Visible for testing
protected static List<String> getMetadataFilesToBeDeleted(
List<String> metadataFiles,
Map<Long, String> metadataFilePinnedTimestampMap,
Logger logger
) {
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

// Keep files since last successful run of scheduler
Expand Down Expand Up @@ -351,27 +372,167 @@ protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
}
}

private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
deleteStaleRemotePrimaryTerms(
metadataFiles,
translogTransferManager,
oldFormatMetadataFilePrimaryTermMap,
minPrimaryTermInRemote,
logger
);
}

/**
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
* <br>
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
*/
private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
private static void deleteStaleRemotePrimaryTerms(
List<String> metadataFiles,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
AtomicLong minPrimaryTermInRemote,
Logger logger
) {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
if (metadataFiles.isEmpty()) {
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
return;
if (metadataFiles.isEmpty()) {
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
return;
}
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFiles.stream().map(file -> {
try {
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
} catch (IOException e) {
return Long.MAX_VALUE;
}
Optional<Long> minPrimaryTerm = metadataFiles.stream()
.map(file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[1]))
.min(Long::compareTo);
// First we delete all stale primary terms folders from remote store
long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1;
}).min(Long::compareTo);
// First we delete all stale primary terms folders from remote store
long minimumReferencedPrimaryTerm = minPrimaryTermFromMetadataFiles.get() - 1;
Long minPrimaryTerm = getMinPrimaryTermInRemote(minPrimaryTermInRemote, translogTransferManager, logger);
if (minimumReferencedPrimaryTerm > minPrimaryTerm) {
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
minPrimaryTermInRemote.set(minimumReferencedPrimaryTerm);
} else {
logger.debug(
"Skipping primary term cleanup. minimumReferencedPrimaryTerm = {}, minPrimaryTermInRemote = {}",
minimumReferencedPrimaryTerm,
minPrimaryTermInRemote
);
}
}

private static Long getMinPrimaryTermInRemote(
AtomicLong minPrimaryTermInRemote,
TranslogTransferManager translogTransferManager,
Logger logger
) {
if (minPrimaryTermInRemote.get() == Long.MAX_VALUE) {
CountDownLatch latch = new CountDownLatch(1);
translogTransferManager.listPrimaryTermsInRemoteAsync(new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Set<Long> primaryTermsInRemote) {
Optional<Long> minPrimaryTerm = primaryTermsInRemote.stream().min(Long::compareTo);
minPrimaryTerm.ifPresent(minPrimaryTermInRemote::set);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception while fetching min primary term from remote translog", e);
}
}, latch));

try {
if (latch.await(5, TimeUnit.MINUTES) == false) {
logger.error("Timeout while fetching min primary term from remote translog");
}
} catch (InterruptedException e) {
logger.error("Exception while fetching min primary term from remote translog", e);
}
}
return minPrimaryTermInRemote.get();
}

protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
String metadataFile,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
) throws IOException {
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
if (minMaxPrimaryTermFromFileName != null) {
return minMaxPrimaryTermFromFileName;
} else {
if (oldFormatMetadataFilePrimaryTermMap.containsKey(metadataFile)) {
return oldFormatMetadataFilePrimaryTermMap.get(metadataFile);
} else {
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile);
long maxPrimaryTem = TranslogTransferMetadata.getPrimaryTermFromFileName(metadataFile);
long minPrimaryTem = -1;
if (metadata.getGenerationToPrimaryTermMapper() != null
&& metadata.getGenerationToPrimaryTermMapper().values().isEmpty() == false) {
Optional<Long> primaryTerm = metadata.getGenerationToPrimaryTermMapper()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
if (primaryTerm.isPresent()) {
minPrimaryTem = primaryTerm.get();
}
}
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
return minMaxPrimaryTermTuple;
}
}
}

public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);

// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
// Delete stale primary terms
() -> deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
)
);
} catch (Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,23 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
*/
public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) {
logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep);
listPrimaryTermsInRemoteAsync(new ActionListener<>() {
@Override
public void onResponse(Set<Long> primaryTermsInRemote) {
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermToKeep)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term));
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while deleting primary terms from remote store", e);
}
});
}

public void listPrimaryTermsInRemoteAsync(ActionListener<Set<Long>> listener) {
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> folders) {
Expand All @@ -532,15 +549,13 @@ public void onResponse(Set<String> folders) {
}
return false;
}).map(Long::parseLong).collect(Collectors.toSet());
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermToKeep)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term));
listener.onResponse(primaryTermsInRemote);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while getting primary terms from remote store", e);
listener.onFailure(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the
Expand Down Expand Up @@ -108,11 +109,28 @@ public String getFileName() {
RemoteStoreUtils.invertLong(createdAt),
String.valueOf(Objects.hash(nodeId)),
RemoteStoreUtils.invertLong(minTranslogGeneration),
String.valueOf(getMinPrimaryTermReferred()),
String.valueOf(CURRENT_VERSION)
)
);
}

private long getMinPrimaryTermReferred() {
if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) {
return -1;
}
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
if (minPrimaryTerm.isPresent()) {
return minPrimaryTerm.get();
} else {
return -1;
}
}

public static Tuple<Tuple<Long, Long>, String> getNodeIdByPrimaryTermAndGeneration(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 6) {
Expand Down Expand Up @@ -143,15 +161,43 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minGeneration = tokens[tokens.length - 2];
String minGeneration = tokens[tokens.length - 3];
String maxGeneration = tokens[2];
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
} catch (NumberFormatException e) {
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
return null;
}
}

public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
// For versions < 2.17, we don't have min primary term.
return null;
}
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minPrimaryTerm = tokens[tokens.length - 2];
String maxPrimaryTerm = tokens[1];
return new Tuple<>(Long.parseLong(minPrimaryTerm), RemoteStoreUtils.invertLong(maxPrimaryTerm));
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max primary term from: {}", filename), e);
return null;
}
}

public static long getPrimaryTermFromFileName(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
try {
return RemoteStoreUtils.invertLong(tokens[1]);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting max primary term from: {}", filename), e);
return -1;
}
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, generation);
Expand Down
Loading

0 comments on commit 8a29eb1

Please sign in to comment.