Skip to content

Commit

Permalink
Self review
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Jul 19, 2023
1 parent 50c3a6c commit c7f56fc
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 45 deletions.
39 changes: 0 additions & 39 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4612,45 +4612,6 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), logger);
}

/**
* Segment Replication method
*
* Downloads specified segments from remote store
* @param filesToFetch Files to download from remote store
*
*/
public List<StoreFileMetadata> syncSegmentsFromRemoteSegmentStore(List<StoreFileMetadata> filesToFetch) throws IOException {
assert indexSettings.isSegRepEnabled() && indexSettings.isRemoteStoreEnabled();
logger.trace("Downloading segments files from remote store {}", filesToFetch);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory();
RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.init();
List<StoreFileMetadata> downloadedSegments = new ArrayList<>();
if (remoteSegmentMetadata != null) {
try {
store.incRef();
remoteStore.incRef();
final Directory storeDirectory = store.directory();
String segmentNFile = null;
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
logger.info("--> Copying file {}", file);
storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT);
downloadedSegments.add(fileMetadata);
if (file.startsWith(IndexFileNames.SEGMENTS)) {
assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file";
segmentNFile = file;
}
}
storeDirectory.sync(downloadedSegments.stream().map(metadata -> metadata.name()).collect(Collectors.toList()));
} finally {
store.decRef();
remoteStore.decRef();
logger.trace("Downloaded segments from remote store {}", downloadedSegments);
}
}
return downloadedSegments;
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -21,6 +24,7 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -92,9 +96,36 @@ public void getSegmentFiles(
ActionListener<GetSegmentFilesResponse> listener
) {
try {
List<StoreFileMetadata> downloadedFiles = indexShard.syncSegmentsFromRemoteSegmentStore(filesToFetch);
assert downloadedFiles.size() == filesToFetch.size() && downloadedFiles.containsAll(filesToFetch);
listener.onResponse(new GetSegmentFilesResponse(downloadedFiles));
logger.trace("Downloading segments files from remote store {}", filesToFetch);
FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory();
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory
.getDelegate();
RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.init();
List<StoreFileMetadata> downloadedSegments = new ArrayList<>();
if (remoteSegmentMetadata != null) {
try {
indexShard.store().incRef();
indexShard.remoteStore().incRef();
final Directory storeDirectory = indexShard.store().directory();
String segmentNFile = null;
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT);
downloadedSegments.add(fileMetadata);
if (file.startsWith(IndexFileNames.SEGMENTS)) {
assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file";
segmentNFile = file;
}
}
storeDirectory.sync(downloadedSegments.stream().map(metadata -> metadata.name()).collect(Collectors.toList()));
} finally {
indexShard.store().decRef();
indexShard.remoteStore().decRef();
logger.trace("Downloaded segments from remote store {}", downloadedSegments);
}
}
listener.onResponse(new GetSegmentFilesResponse(downloadedSegments));
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ public void testGetSegmentFiles() throws ExecutionException, InterruptedExceptio

public void testGetSegmentFilesFailure() throws IOException {
final ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
Mockito.doThrow(new RuntimeException("testing"))
.when(mockShard)
.syncSegmentsFromRemoteSegmentStore(Mockito.any());
Mockito.doThrow(new RuntimeException("testing")).when(mockShard).store();
assertThrows(ExecutionException.class, () -> {
final PlainActionFuture<GetSegmentFilesResponse> res = PlainActionFuture.newFuture();
replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), mockShard, res);
Expand Down

0 comments on commit c7f56fc

Please sign in to comment.