Skip to content

Commit

Permalink
Implement block level fetch for Composite Directory
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed Mar 20, 2024
1 parent f1cd4e4 commit d507327
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.opensearch.index.store.remote.file.OnDemandCompositeBlockIndexInput;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.filetracker.FileState;
Expand All @@ -29,12 +30,14 @@ public class CompositeDirectory extends FilterDirectory {
private final FSDirectory localDirectory;
private final RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter;
private final FileCache fileCache;
private final FSDirectory localCacheDirectory;

public CompositeDirectory(FSDirectory localDirectory, FileCache fileCache) {
public CompositeDirectory(FSDirectory localDirectory, FSDirectory localCacheDirectory, FileCache fileCache) {
super(localDirectory);
this.localDirectory = localDirectory;
this.fileCache = fileCache;
this.remoteStoreFileTrackerAdapter = new CompositeDirectoryRemoteStoreFileTrackerAdapter(fileCache);
this.localCacheDirectory = localCacheDirectory;
}

public void setRemoteDirectory(Directory remoteDirectory) {
Expand Down Expand Up @@ -102,7 +105,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
break;

case REMOTE_ONLY:
// TODO - return an implementation of OnDemandBlockIndexInput where the fetchBlock method is implemented
indexInput = new OnDemandCompositeBlockIndexInput(remoteStoreFileTrackerAdapter, name, localCacheDirectory);
break;
}
return indexInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Supplier;

public class CompositeDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
private static String CACHE_LOCATION = "remote_cache";

private final Supplier<RepositoriesService> repositoriesService;
private final FileCache remoteStoreFileCache;
Expand All @@ -36,8 +38,9 @@ public CompositeDirectoryFactory(Supplier<RepositoriesService> repositoriesServi

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
final Path location = shardPath.resolveIndex();
final FSDirectory primaryDirectory = FSDirectory.open(location);
return new CompositeDirectory(primaryDirectory, remoteStoreFileCache);
final FSDirectory primaryDirectory = FSDirectory.open(shardPath.resolveIndex());
final FSDirectory localCacheDirectory = FSDirectory.open(Files.createDirectories(shardPath.getDataPath().resolve(CACHE_LOCATION)));
localCacheDirectory.syncMetaData();
return new CompositeDirectory(primaryDirectory, localCacheDirectory, remoteStoreFileCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.BlobFetchRequest;
import org.opensearch.index.store.remote.utils.TransferManager;
import org.opensearch.index.store.remote.utils.filetracker.FileState;
import org.opensearch.index.store.remote.utils.filetracker.FileTrackingInfo;
import org.opensearch.index.store.remote.utils.filetracker.FileType;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -36,10 +38,21 @@ public void setRemoteDirectory(Directory remoteDirectory) {
this.remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectory;
}

public String getUploadedFileName(String name) {
return remoteDirectory.getExistingRemoteFilename(name);
}

public long getFileLength(String name) {
try {
return remoteDirectory.fileLength(name);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) {
// TODO - This function will fetch the requested data from blobContainer
return null;
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {
return new TransferManager(remoteDirectory.getDataDirectoryBlobContainer(), fileCache).fetchBlob(blobFetchRequest);
}

public void trackFile(String name, FileState fileState, FileType fileType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.logging.Loggers;
Expand Down Expand Up @@ -134,6 +135,10 @@ public RemoteSegmentStoreDirectory(
init();
}

public BlobContainer getDataDirectoryBlobContainer() {
return remoteDataDirectory.getBlobContainer();
}

/**
* Initializes the cache which keeps track of all the segment files uploaded to the remote segment store.
* As this cache is specific to an instance of RemoteSegmentStoreDirectory, it is possible that cache becomes stale
Expand Down Expand Up @@ -696,7 +701,7 @@ private String getChecksumOfLocalFile(Directory directory, String file) throws I
}
}

private String getExistingRemoteFilename(String localFilename) {
public String getExistingRemoteFilename(String localFilename) {
if (segmentsUploadedToRemoteStore.containsKey(localFilename)) {
return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.opensearch.index.store.remote.utils.filetracker.FileState;
import org.opensearch.index.store.remote.utils.filetracker.FileType;

import java.io.IOException;

public interface RemoteStoreFileTrackerAdapter {
IndexInput fetchBlob(BlobFetchRequest blobFetchRequest);
IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException;

void trackFile(String name, FileState fileState, FileType fileType);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.file;

import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.opensearch.index.store.CompositeDirectoryRemoteStoreFileTrackerAdapter;
import org.opensearch.index.store.RemoteStoreFileTrackerAdapter;
import org.opensearch.index.store.remote.utils.BlobFetchRequest;

import java.io.IOException;

public class OnDemandCompositeBlockIndexInput extends OnDemandBlockIndexInput {

private final RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter;
private final String fileName;
private final Long originalFileSize;
private final FSDirectory directory;

public OnDemandCompositeBlockIndexInput(RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter, String fileName, FSDirectory directory) {
this(
OnDemandBlockIndexInput.builder().
resourceDescription("OnDemandCompositeBlockIndexInput").
isClone(false).
offset(0L).
length(getFileLength(remoteStoreFileTrackerAdapter, fileName)),
remoteStoreFileTrackerAdapter,
fileName,
directory);
}

public OnDemandCompositeBlockIndexInput(Builder builder, RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter, String fileName, FSDirectory directory) {
super(builder);
this.remoteStoreFileTrackerAdapter = remoteStoreFileTrackerAdapter;
this.directory = null;
this.fileName = fileName;
originalFileSize = getFileLength(remoteStoreFileTrackerAdapter, fileName);
}

@Override
protected OnDemandCompositeBlockIndexInput buildSlice(String sliceDescription, long offset, long length) {
return new OnDemandCompositeBlockIndexInput(
OnDemandBlockIndexInput.builder().
blockSizeShift(blockSizeShift).
isClone(true).
offset(this.offset + offset).
length(length).
resourceDescription(sliceDescription),
remoteStoreFileTrackerAdapter,
fileName,
directory
);
}

@Override
protected IndexInput fetchBlock(int blockId) throws IOException {
final String uploadedFileName = ((CompositeDirectoryRemoteStoreFileTrackerAdapter)remoteStoreFileTrackerAdapter).getUploadedFileName(fileName);
final String blockFileName = uploadedFileName + "." + blockId;
final long blockStart = getBlockStart(blockId);
final long length = getActualBlockSize(blockId);

BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
.position(blockStart)
.length(length)
.blobName(uploadedFileName)
.directory(directory)
.fileName(blockFileName)
.build();
return remoteStoreFileTrackerAdapter.fetchBlob(blobFetchRequest);
}

@Override
public OnDemandBlockIndexInput clone() {
OnDemandCompositeBlockIndexInput clone = buildSlice("clone", 0L, this.length);
// ensures that clones may be positioned at the same point as the blocked file they were cloned from
clone.cloneBlock(this);
return clone;
}

private long getActualBlockSize(int blockId) {
return (blockId != getBlock(originalFileSize - 1)) ? blockSize : getBlockOffset(originalFileSize - 1) + 1;
}

private static long getFileLength(RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter, String fileName) {
return ((CompositeDirectoryRemoteStoreFileTrackerAdapter)remoteStoreFileTrackerAdapter).getFileLength(fileName);
}
}

0 comments on commit d507327

Please sign in to comment.