From 54ce4237e01b55f7b861e5b3cdf4d36999b671f9 Mon Sep 17 00:00:00 2001 From: Ashish Date: Fri, 3 Feb 2023 09:56:12 +0530 Subject: [PATCH] Purge remote translog basis the latest metadata for remote-backed indexes (#6086) * Deleting remote translog considering latest remote metadata Co-authored-by: Gaurav Bafna Signed-off-by: Ashish Singh --- .../index/translog/RemoteFsTranslog.java | 84 ++++++++++++++-- .../transfer/BlobStoreTransferService.java | 29 ++++++ .../transfer/FileTransferTracker.java | 12 ++- .../translog/transfer/TransferService.java | 12 +++ .../transfer/TranslogTransferManager.java | 72 ++++++++++++-- .../listener/FileTransferListener.java | 1 - .../index/translog/RemoteFSTranslogTests.java | 69 +++++++------ .../transfer/FileTransferTrackerTests.java | 22 +++++ .../TranslogTransferManagerTests.java | 97 +++++++++++++++---- 9 files changed, 331 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 0cfaa5234c1fe..a3a6eba39e126 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -8,6 +8,9 @@ package org.opensearch.index.translog; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.common.SetOnce; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; @@ -26,12 +29,15 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.LongStream; /** * A Translog implementation which syncs local FS with a remote store @@ -51,6 +57,12 @@ public class RemoteFsTranslog extends Translog { private volatile long minSeqNoToKeep; + // min generation referred by last uploaded translog + private volatile long minRemoteGenReferenced; + + // clean up translog folder uploaded by previous primaries once + private final SetOnce olderPrimaryCleaned = new SetOnce<>(); + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -230,6 +242,7 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti transferReleasable.close(); closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; + minRemoteGenReferenced = getMinFileGeneration(); logger.trace("uploaded translog for {} {} ", primaryTerm, generation); } @@ -327,13 +340,72 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } - @Override - void deleteReaderFiles(TranslogReader reader) { + private void deleteRemoteGeneration(Set generations) { try { - translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation); - } catch (IOException ignored) { - logger.error("Exception {} while deleting generation {}", ignored, reader.generation); + translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generations), e); + } + } + + @Override + public void trimUnreferencedReaders() throws IOException { + // clean up local translog files and updates readers + super.trimUnreferencedReaders(); + + // cleans up remote translog files not referenced in latest uploaded metadata. + // This enables us to restore translog from the metadata in case of failover or relocation. + Set generationsToDelete = new HashSet<>(); + for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) { + if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) { + break; + } + generationsToDelete.add(generation); + } + if (generationsToDelete.isEmpty() == false) { + deleteRemoteGeneration(generationsToDelete); + deleteOlderPrimaryTranslogFilesFromRemoteStore(); + } + } + + /** + * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures + * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term. + */ + private void deleteOlderPrimaryTranslogFilesFromRemoteStore() { + // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there + // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part + // of older primary term. + if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { + logger.info("Cleaning up translog uploaded by previous primaries"); + long minPrimaryTermInMetadata = current.getPrimaryTerm(); + Set primaryTermsInRemote; + try { + primaryTermsInRemote = translogTransferManager.listPrimaryTerms(); + } catch (IOException e) { + logger.error("Exception occurred while getting primary terms from remote store", e); + // If there are exceptions encountered, then we try to delete all older primary terms lesser than the + // minimum referenced primary term in remote translog metadata. + primaryTermsInRemote = LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet()); + } + // Delete all primary terms that are no more referenced by the metadata file and exists in the + Set primaryTermsToDelete = primaryTermsInRemote.stream() + .filter(term -> term < minPrimaryTermInMetadata) + .collect(Collectors.toSet()); + primaryTermsToDelete.forEach(term -> translogTransferManager.deleteTranslogAsync(term, new ActionListener<>() { + @Override + public void onResponse(Void response) { + // NO-OP + } + + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage("Exception occurred while deleting older translog files for primary_term={}", term), + e + ); + } + })); } - super.deleteReaderFiles(reader); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 78a26baa052ef..08a98a491a035 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -81,8 +81,37 @@ public void deleteBlobs(Iterable path, List fileNames) throws IO blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); } + @Override + public void deleteBlobsAsync(Iterable path, List fileNames, ActionListener listener) { + executorService.execute(() -> { + try { + blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); + listener.onResponse(null); + } catch (IOException e) { + listener.onFailure(e); + } + }); + } + + @Override + public void deleteAsync(Iterable path, ActionListener listener) { + executorService.execute(() -> { + try { + blobStore.blobContainer((BlobPath) path).delete(); + listener.onResponse(null); + } catch (IOException e) { + listener.onFailure(e); + } + }); + } + @Override public Set listAll(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).listBlobs().keySet(); } + + @Override + public Set listFolders(Iterable path) throws IOException { + return blobStore.blobContainer((BlobPath) path).children().keySet(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 5338142afed33..1909164bd821a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -13,6 +13,7 @@ import org.opensearch.index.translog.transfer.listener.FileTransferListener; import java.util.HashSet; +import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -55,9 +56,14 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { add(fileSnapshot.getName(), TransferState.FAILED); } - @Override - public void onDelete(String name) { - fileTransferTracker.remove(name); + public void delete(List names) { + for (String name : names) { + fileTransferTracker.remove(name); + } + } + + public boolean uploaded(String file) { + return fileTransferTracker.get(file) == TransferState.SUCCESS; } public Set exclusionFilter(Set original) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 5745d0838efb3..5ba15ad01d44e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -45,6 +45,10 @@ void uploadBlobAsync( void deleteBlobs(Iterable path, List fileNames) throws IOException; + void deleteBlobsAsync(Iterable path, List fileNames, ActionListener listener); + + void deleteAsync(Iterable path, ActionListener listener); + /** * Lists the files * @param path : the path to list @@ -53,6 +57,14 @@ void uploadBlobAsync( */ Set listAll(Iterable path) throws IOException; + /** + * Lists the folders inside the path. + * @param path : the path + * @return list of folders inside the path + * @throws IOException the exception while listing folders inside the path + */ + Set listFolders(Iterable path) throws IOException; + /** * * @param path the remote path from where download should be made diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 35ccb4ccf17db..48331f6528606 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -195,14 +195,68 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); } - public void deleteTranslog(long primaryTerm, long generation) throws IOException { - String ckpFileName = Translog.getCommitCheckpointFileName(generation); - String translogFilename = Translog.getFilename(generation); - // ToDo - Take care of metadata file cleanup - // https://github.com/opensearch-project/OpenSearch/issues/5677 - fileTransferTracker.onDelete(ckpFileName); - fileTransferTracker.onDelete(translogFilename); - List files = List.of(ckpFileName, translogFilename); - transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files); + /** + * This method handles deletion of multiple generations for a single primary term. + * TODO: Take care of metadata file cleanup. Github Issue #5677 + * + * @param primaryTerm primary term + * @param generations set of generation + */ + public void deleteTranslogAsync(long primaryTerm, Set generations) throws IOException { + if (generations.isEmpty()) { + return; + } + List files = new ArrayList<>(); + generations.forEach(generation -> { + String ckpFileName = Translog.getCommitCheckpointFileName(generation); + String translogFilename = Translog.getFilename(generation); + files.addAll(List.of(ckpFileName, translogFilename)); + }); + transferService.deleteBlobsAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + fileTransferTracker.delete(files); + } + + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage( + "Exception occurred while deleting translog for primary_term={} generations={}", + primaryTerm, + generations + ), + e + ); + } + }); + } + + /** + * Handles deletion of translog files for a particular primary term. + * + * @param primaryTerm primary term + * @param listener listener for response and failure + */ + public void deleteTranslogAsync(long primaryTerm, ActionListener listener) { + transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener); + } + + /** + * Lists all primary terms existing on remote store. + * + * @return the list of primary terms. + * @throws IOException is thrown if it can read the data. + */ + public Set listPrimaryTerms() throws IOException { + return transferService.listFolders(remoteBaseTransferPath).stream().filter(s -> { + try { + Long.parseLong(s); + return true; + } catch (Exception ignored) { + // NO-OP + } + return false; + }).map(Long::parseLong).collect(Collectors.toSet()); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java index c489e4b9a5809..af78cb50b63c6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java @@ -30,5 +30,4 @@ public interface FileTransferListener { */ void onFailure(TransferFileSnapshot fileSnapshot, Exception e); - void onDelete(String name); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 0e728ca3f1d4e..cb3affb71b3dc 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -465,7 +465,7 @@ public void testRangeSnapshot() throws Exception { } } - public void testSimpleOperationsUpload() throws IOException { + public void testSimpleOperationsUpload() throws Exception { ArrayList ops = new ArrayList<>(); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.size(0)); @@ -477,18 +477,18 @@ public void testSimpleOperationsUpload() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - assertEquals(translog.allUploaded().size(), 2); + assertEquals(4, translog.allUploaded().size()); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); - assertEquals(translog.allUploaded().size(), 4); + assertEquals(6, translog.allUploaded().size()); translog.rollGeneration(); - assertEquals(translog.allUploaded().size(), 4); + assertEquals(6, translog.allUploaded().size()); Set mdFiles = blobStoreTransferService.listAll( repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata") ); - assertEquals(mdFiles.size(), 2); + assertEquals(2, mdFiles.size()); logger.info("All md files {}", mdFiles); Set tlogFiles = blobStoreTransferService.listAll( @@ -529,33 +529,48 @@ public void testSimpleOperationsUpload() throws IOException { translog.deletionPolicy.setLocalCheckpointOfSafeCommit(0); // simulating the remote segment upload . translog.setMinSeqNoToKeep(0); - // This should not trim anything + // This should not trim anything from local translog.trimUnreferencedReaders(); - assertEquals(translog.allUploaded().size(), 4); - assertEquals( - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())) - ).size(), - 4 - ); + assertEquals(2, translog.readers.size()); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll( + repository.basePath() + .add(shardId.getIndex().getUUID()) + .add(String.valueOf(shardId.id())) + .add(String.valueOf(primaryTerm.get())) + ).size() + ); + }); - // This should trim tlog-2.* files as it contains seq no 0 + // This should trim tlog-2 from local + // This should not trim tlog-2.* files from remote as we not uploading any more translog to remote translog.setMinSeqNoToKeep(1); + translog.deletionPolicy.setLocalCheckpointOfSafeCommit(1); translog.trimUnreferencedReaders(); - assertEquals(translog.allUploaded().size(), 2); - assertEquals( - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())) - ).size(), - 2 - ); + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll( + repository.basePath() + .add(shardId.getIndex().getUUID()) + .add(String.valueOf(shardId.id())) + .add(String.valueOf(primaryTerm.get())) + ).size() + ); + }); + // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); + translog.deletionPolicy.setLocalCheckpointOfSafeCommit(1); + translog.setMinSeqNoToKeep(2); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + assertBusy(() -> assertEquals(4, translog.allUploaded().size())); } private Long populateTranslogOps(boolean withMissingOps) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java index c6b4579f5ddd1..be14e4a7bd380 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java @@ -15,6 +15,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.List; public class FileTransferTrackerTests extends OpenSearchTestCase { @@ -74,4 +75,25 @@ public void testOnFailure() throws IOException { } } + public void testUploaded() throws IOException { + fileTransferTracker = new FileTransferTracker(shardId); + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + try ( + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong() + ); + + ) { + fileTransferTracker.onSuccess(transferFileSnapshot); + String fileName = String.valueOf(testFile.getFileName()); + assertTrue(fileTransferTracker.uploaded(fileName)); + assertFalse(fileTransferTracker.uploaded("random-name")); + + fileTransferTracker.delete(List.of(fileName)); + assertFalse(fileTransferTracker.uploaded(fileName)); + } + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 0677879549905..0abbfcd3eb69c 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -12,31 +12,35 @@ import org.apache.lucene.util.Constants; import org.mockito.Mockito; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; -import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { @@ -74,23 +78,25 @@ public void testTransferSnapshot() throws IOException { return null; }).when(transferService).uploadBlobAsync(any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { + @Override + public void onSuccess(TransferFileSnapshot fileSnapshot) { + fileTransferSucceeded.incrementAndGet(); + super.onSuccess(fileSnapshot); + } + + @Override + public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { + fileTransferFailed.incrementAndGet(); + super.onFailure(fileSnapshot, e); + } + + }; + TranslogTransferManager translogTransferManager = new TranslogTransferManager( transferService, remoteBaseTransferPath, - new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { - @Override - public void onSuccess(TransferFileSnapshot fileSnapshot) { - fileTransferSucceeded.incrementAndGet(); - } - - @Override - public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { - fileTransferFailed.incrementAndGet(); - } - - @Override - public void onDelete(String name) {} - } + fileTransferTracker ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -108,6 +114,7 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { assertEquals(0, fileTransferFailed.get()); assertEquals(1, translogTransferSucceeded.get()); assertEquals(0, translogTransferFailed.get()); + assertEquals(4, fileTransferTracker.allUploaded().size()); } private TransferSnapshot createTransferSnapshot() { @@ -295,6 +302,54 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { // Since the tracker already holds the files with success state, adding them with success state is allowed tracker.add(translogFile, true); tracker.add(checkpointFile, true); + } + + public void testDeleteTranslogSuccess() throws Exception { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService( + blobStore, + OpenSearchExecutors.newDirectExecutorService() + ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + blobStoreTransferService, + remoteBaseTransferPath, + tracker + ); + String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; + tracker.add(translogFile, true); + tracker.add(checkpointFile, true); + assertEquals(2, tracker.allUploaded().size()); + + List files = List.of(checkpointFile, translogFile); + translogTransferManager.deleteTranslogAsync(primaryTerm, Set.of(19L)); + assertBusy(() -> assertEquals(0, tracker.allUploaded().size())); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files)); + } + + public void testDeleteTranslogFailure() throws Exception { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + doAnswer(invocation -> { throw new IOException("test exception"); }).when(blobStore).blobContainer(any(BlobPath.class)); + // when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService( + blobStore, + OpenSearchExecutors.newDirectExecutorService() + ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + blobStoreTransferService, + remoteBaseTransferPath, + tracker + ); + String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; + tracker.add(translogFile, true); + tracker.add(checkpointFile, true); + assertEquals(2, tracker.allUploaded().size()); + translogTransferManager.deleteTranslogAsync(primaryTerm, Set.of(19L)); + assertEquals(2, tracker.allUploaded().size()); } }