Skip to content

Commit

Permalink
transfer content type
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasvb90 committed Jul 19, 2023
1 parent bbadfc1 commit c570d4d
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ interface Factory {
String name();

/**
* To Initialise a crypto context used in encryption. This might be needed to set the context before beginning
* To Initialise a crypto context used in encryption. This is needed to set the context before beginning
* encryption.
*
* @return crypto context instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.TransferContentType;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -428,7 +429,7 @@ private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, Acti
batchUploadListener.onFailure(ex);
});
statsListener.beforeUpload(src);
remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener);
remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener, TransferContentType.DATA);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.index.translog.transfer.TransferContentType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.crypto.CryptoManager;
Expand Down Expand Up @@ -389,7 +390,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
* @param context IOContext to be used to open IndexInput of file during remote upload
* @param listener Listener to handle upload callback events
*/
public void copyFrom(Directory from, String src, IOContext context, ActionListener<Void> listener) {
public void copyFrom(Directory from, String src, IOContext context, ActionListener<Void> listener, TransferContentType transferContentType) {
if (remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer) {
try {
String remoteFilename = getNewRemoteSegmentFilename(src);
Expand All @@ -399,7 +400,7 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen
}
} else {
try {
copyFrom(from, src, src, context);
copyFrom(from, src, src, context, getChecksumOfLocalFile(from, src), transferContentType);
listener.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", src), e);
Expand Down Expand Up @@ -534,7 +535,7 @@ String getMetadataFileForCommit(long primaryTerm, long generation) throws IOExce
return metadataFiles.get(0);
}

public void copyFrom(Directory from, String src, String dest, IOContext context, String checksum) throws IOException {
public void copyFrom(Directory from, String src, String dest, IOContext context, String checksum, TransferContentType transferContentType) throws IOException {
String remoteFilename;
remoteFilename = getNewRemoteSegmentFilename(dest);
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
Expand Down Expand Up @@ -580,7 +581,7 @@ private InputStream maybeCreateEncryptedRemoteInputStream(String file) throws IO
*/
@Override
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
copyFrom(from, src, dest, context, getChecksumOfLocalFile(from, src));
copyFrom(from, src, dest, context, getChecksumOfLocalFile(from, src), TransferContentType.DATA);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.index.translog.transfer.TransferContentType;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -553,7 +554,7 @@ public void onResponse(Void unused) {
@Override
public void onFailure(Exception e) {}
};
remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener);
remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, TransferContentType.DATA);
assertTrue(latch.await(5000, TimeUnit.SECONDS));
assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));
storeDirectory.close();
Expand Down Expand Up @@ -590,7 +591,7 @@ public void onFailure(Exception e) {
latch.countDown();
}
};
remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener);
remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, TransferContentType.DATA);
assertTrue(latch.await(5000, TimeUnit.SECONDS));
assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));

Expand Down

0 comments on commit c570d4d

Please sign in to comment.