From c7f56fcc34b6b935badb8b636d1ba7db081c4da0 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 18 Jul 2023 17:32:41 -0700 Subject: [PATCH] Self review Signed-off-by: Suraj Singh --- .../opensearch/index/shard/IndexShard.java | 39 ------------------- .../RemoteStoreReplicationSource.java | 37 ++++++++++++++++-- .../RemoteStoreReplicationSourceTests.java | 4 +- 3 files changed, 35 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c1aaf65db94c3..a2312d4e9c48a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4612,45 +4612,6 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), logger); } - /** - * Segment Replication method - * - * Downloads specified segments from remote store - * @param filesToFetch Files to download from remote store - * - */ - public List syncSegmentsFromRemoteSegmentStore(List filesToFetch) throws IOException { - assert indexSettings.isSegRepEnabled() && indexSettings.isRemoteStoreEnabled(); - logger.trace("Downloading segments files from remote store {}", filesToFetch); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory(); - RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.init(); - List downloadedSegments = new ArrayList<>(); - if (remoteSegmentMetadata != null) { - try { - store.incRef(); - remoteStore.incRef(); - final Directory storeDirectory = store.directory(); - String segmentNFile = null; - for (StoreFileMetadata fileMetadata : filesToFetch) { - String file = fileMetadata.name(); - logger.info("--> Copying file {}", file); - storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT); - downloadedSegments.add(fileMetadata); - if (file.startsWith(IndexFileNames.SEGMENTS)) { - assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file"; - segmentNFile = file; - } - } - storeDirectory.sync(downloadedSegments.stream().map(metadata -> metadata.name()).collect(Collectors.toList())); - } finally { - store.decRef(); - remoteStore.decRef(); - logger.trace("Downloaded segments from remote store {}", downloadedSegments); - } - } - return downloadedSegments; - } - /** * Downloads segments from remote segment store. * @param overrideLocal flag to override local segment files with those in remote store diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 36b58de318959..a97cec2c904e2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -10,7 +10,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; import org.opensearch.action.ActionListener; import org.opensearch.index.shard.IndexShard; @@ -21,6 +24,7 @@ import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -92,9 +96,36 @@ public void getSegmentFiles( ActionListener listener ) { try { - List downloadedFiles = indexShard.syncSegmentsFromRemoteSegmentStore(filesToFetch); - assert downloadedFiles.size() == filesToFetch.size() && downloadedFiles.containsAll(filesToFetch); - listener.onResponse(new GetSegmentFilesResponse(downloadedFiles)); + logger.trace("Downloading segments files from remote store {}", filesToFetch); + FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory + .getDelegate(); + RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.init(); + List downloadedSegments = new ArrayList<>(); + if (remoteSegmentMetadata != null) { + try { + indexShard.store().incRef(); + indexShard.remoteStore().incRef(); + final Directory storeDirectory = indexShard.store().directory(); + String segmentNFile = null; + for (StoreFileMetadata fileMetadata : filesToFetch) { + String file = fileMetadata.name(); + storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT); + downloadedSegments.add(fileMetadata); + if (file.startsWith(IndexFileNames.SEGMENTS)) { + assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file"; + segmentNFile = file; + } + } + storeDirectory.sync(downloadedSegments.stream().map(metadata -> metadata.name()).collect(Collectors.toList())); + } finally { + indexShard.store().decRef(); + indexShard.remoteStore().decRef(); + logger.trace("Downloaded segments from remote store {}", downloadedSegments); + } + } + listener.onResponse(new GetSegmentFilesResponse(downloadedSegments)); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index f3134d5be4024..0e3d69171c1e7 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -168,9 +168,7 @@ public void testGetSegmentFiles() throws ExecutionException, InterruptedExceptio public void testGetSegmentFilesFailure() throws IOException { final ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); - Mockito.doThrow(new RuntimeException("testing")) - .when(mockShard) - .syncSegmentsFromRemoteSegmentStore(Mockito.any()); + Mockito.doThrow(new RuntimeException("testing")).when(mockShard).store(); assertThrows(ExecutionException.class, () -> { final PlainActionFuture res = PlainActionFuture.newFuture(); replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), mockShard, res);