Skip to content

Commit

Permalink
Purge remote translog basis the latest metadata for remote-backed ind…
Browse files Browse the repository at this point in the history
…exes (#6086)

* Deleting remote translog considering latest remote metadata

Co-authored-by: Gaurav Bafna <gbbafna@amazon.com>
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 and gbbafna authored Feb 3, 2023
1 parent 2f9a475 commit 54ce423
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
Expand All @@ -26,12 +29,15 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

/**
* A Translog implementation which syncs local FS with a remote store
Expand All @@ -51,6 +57,12 @@ public class RemoteFsTranslog extends Translog {

private volatile long minSeqNoToKeep;

// min generation referred by last uploaded translog
private volatile long minRemoteGenReferenced;

// clean up translog folder uploaded by previous primaries once
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

public RemoteFsTranslog(
TranslogConfig config,
String translogUUID,
Expand Down Expand Up @@ -230,6 +242,7 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
}

Expand Down Expand Up @@ -327,13 +340,72 @@ protected void setMinSeqNoToKeep(long seqNo) {
this.minSeqNoToKeep = seqNo;
}

@Override
void deleteReaderFiles(TranslogReader reader) {
private void deleteRemoteGeneration(Set<Long> generations) {
try {
translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation);
} catch (IOException ignored) {
logger.error("Exception {} while deleting generation {}", ignored, reader.generation);
translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generations), e);
}
}

@Override
public void trimUnreferencedReaders() throws IOException {
// clean up local translog files and updates readers
super.trimUnreferencedReaders();

// cleans up remote translog files not referenced in latest uploaded metadata.
// This enables us to restore translog from the metadata in case of failover or relocation.
Set<Long> generationsToDelete = new HashSet<>();
for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) {
if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) {
break;
}
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
deleteOlderPrimaryTranslogFilesFromRemoteStore();
}
}

/**
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
*/
private void deleteOlderPrimaryTranslogFilesFromRemoteStore() {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
logger.info("Cleaning up translog uploaded by previous primaries");
long minPrimaryTermInMetadata = current.getPrimaryTerm();
Set<Long> primaryTermsInRemote;
try {
primaryTermsInRemote = translogTransferManager.listPrimaryTerms();
} catch (IOException e) {
logger.error("Exception occurred while getting primary terms from remote store", e);
// If there are exceptions encountered, then we try to delete all older primary terms lesser than the
// minimum referenced primary term in remote translog metadata.
primaryTermsInRemote = LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet());
}
// Delete all primary terms that are no more referenced by the metadata file and exists in the
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermInMetadata)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> translogTransferManager.deleteTranslogAsync(term, new ActionListener<>() {
@Override
public void onResponse(Void response) {
// NO-OP
}

@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage("Exception occurred while deleting older translog files for primary_term={}", term),
e
);
}
}));
}
super.deleteReaderFiles(reader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,37 @@ public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IO
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
}

@Override
public void deleteBlobsAsync(Iterable<String> path, List<String> fileNames, ActionListener<Void> listener) {
executorService.execute(() -> {
try {
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
listener.onResponse(null);
} catch (IOException e) {
listener.onFailure(e);
}
});
}

@Override
public void deleteAsync(Iterable<String> path, ActionListener<Void> listener) {
executorService.execute(() -> {
try {
blobStore.blobContainer((BlobPath) path).delete();
listener.onResponse(null);
} catch (IOException e) {
listener.onFailure(e);
}
});
}

@Override
public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
}

@Override
public Set<String> listFolders(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).children().keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.translog.transfer.listener.FileTransferListener;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -55,9 +56,14 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
add(fileSnapshot.getName(), TransferState.FAILED);
}

@Override
public void onDelete(String name) {
fileTransferTracker.remove(name);
public void delete(List<String> names) {
for (String name : names) {
fileTransferTracker.remove(name);
}
}

public boolean uploaded(String file) {
return fileTransferTracker.get(file) == TransferState.SUCCESS;
}

public Set<TransferFileSnapshot> exclusionFilter(Set<TransferFileSnapshot> original) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ void uploadBlobAsync(

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

void deleteBlobsAsync(Iterable<String> path, List<String> fileNames, ActionListener<Void> listener);

void deleteAsync(Iterable<String> path, ActionListener<Void> listener);

/**
* Lists the files
* @param path : the path to list
Expand All @@ -53,6 +57,14 @@ void uploadBlobAsync(
*/
Set<String> listAll(Iterable<String> path) throws IOException;

/**
* Lists the folders inside the path.
* @param path : the path
* @return list of folders inside the path
* @throws IOException the exception while listing folders inside the path
*/
Set<String> listFolders(Iterable<String> path) throws IOException;

/**
*
* @param path the remote path from where download should be made
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,68 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
);
}

public void deleteTranslog(long primaryTerm, long generation) throws IOException {
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFilename = Translog.getFilename(generation);
// ToDo - Take care of metadata file cleanup
// https://github.com/opensearch-project/OpenSearch/issues/5677
fileTransferTracker.onDelete(ckpFileName);
fileTransferTracker.onDelete(translogFilename);
List<String> files = List.of(ckpFileName, translogFilename);
transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files);
/**
* This method handles deletion of multiple generations for a single primary term.
* TODO: Take care of metadata file cleanup. <a href="https://github.com/opensearch-project/OpenSearch/issues/5677">Github Issue #5677</a>
*
* @param primaryTerm primary term
* @param generations set of generation
*/
public void deleteTranslogAsync(long primaryTerm, Set<Long> generations) throws IOException {
if (generations.isEmpty()) {
return;
}
List<String> files = new ArrayList<>();
generations.forEach(generation -> {
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFilename = Translog.getFilename(generation);
files.addAll(List.of(ckpFileName, translogFilename));
});
transferService.deleteBlobsAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
}

@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primary_term={} generations={}",
primaryTerm,
generations
),
e
);
}
});
}

/**
* Handles deletion of translog files for a particular primary term.
*
* @param primaryTerm primary term
* @param listener listener for response and failure
*/
public void deleteTranslogAsync(long primaryTerm, ActionListener<Void> listener) {
transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener);
}

/**
* Lists all primary terms existing on remote store.
*
* @return the list of primary terms.
* @throws IOException is thrown if it can read the data.
*/
public Set<Long> listPrimaryTerms() throws IOException {
return transferService.listFolders(remoteBaseTransferPath).stream().filter(s -> {
try {
Long.parseLong(s);
return true;
} catch (Exception ignored) {
// NO-OP
}
return false;
}).map(Long::parseLong).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ public interface FileTransferListener {
*/
void onFailure(TransferFileSnapshot fileSnapshot, Exception e);

void onDelete(String name);
}
Loading

0 comments on commit 54ce423

Please sign in to comment.