diff --git a/server/src/main/java/org/opensearch/cluster/metadata/RepositoryMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/RepositoryMetadata.java index db4e5d8137a20..2d22498bfdabf 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/RepositoryMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/RepositoryMetadata.java @@ -61,6 +61,11 @@ public class RepositoryMetadata implements Writeable { */ private final long pendingGeneration; + /** + * Whether repository is encrypted + */ + private final Boolean encrypted; + /** * Constructs new repository metadata * @@ -69,14 +74,22 @@ public class RepositoryMetadata implements Writeable { * @param settings repository settings */ public RepositoryMetadata(String name, String type, Settings settings) { - this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN); + this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN, null); + } + + public RepositoryMetadata(String name, String type, Settings settings, Boolean encrypted) { + this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN, encrypted); } public RepositoryMetadata(RepositoryMetadata metadata, long generation, long pendingGeneration) { - this(metadata.name, metadata.type, metadata.settings, generation, pendingGeneration); + this(metadata.name, metadata.type, metadata.settings, generation, pendingGeneration, null); } public RepositoryMetadata(String name, String type, Settings settings, long generation, long pendingGeneration) { + this(name, type, settings, generation, pendingGeneration, null); + } + + public RepositoryMetadata(String name, String type, Settings settings, long generation, long pendingGeneration, Boolean encrypted) { this.name = name; this.type = type; this.settings = settings; @@ -87,6 +100,7 @@ public RepositoryMetadata(String name, String type, Settings settings, long gene + "] must be greater or equal to generation [" + generation + "]"; + this.encrypted = encrypted; } /** @@ -116,6 +130,15 @@ public Settings settings() { return this.settings; } + /** + * Returns whether repository is encrypted + * + * @return whether repository is encrypted + */ + public Boolean encrypted() { + return null; + } + /** * Returns the safe repository generation. {@link RepositoryData} for this generation is assumed to exist in the repository. * All operations on the repository must be based on the {@link RepositoryData} at this generation. @@ -146,6 +169,7 @@ public RepositoryMetadata(StreamInput in) throws IOException { settings = Settings.readSettingsFromStream(in); generation = in.readLong(); pendingGeneration = in.readLong(); + encrypted = null; } /** diff --git a/server/src/main/java/org/opensearch/common/Stream.java b/server/src/main/java/org/opensearch/common/Stream.java new file mode 100644 index 0000000000000..ab5666f5819fc --- /dev/null +++ b/server/src/main/java/org/opensearch/common/Stream.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import java.io.InputStream; + +/** + * Model composed of an input stream, the total content length and offset + */ +public class Stream { + + private final InputStream inputStream; + private final long contentLength; + private final long offset; + + /** + * Construct a new stream object + * + * @param inputStream The input stream that is to be encapsulated + * @param contentLength The total content length that is to be read from the stream + * @param offset The offset pointer that this stream reads from in the file + */ + public Stream(InputStream inputStream, long contentLength, long offset) { + this.inputStream = inputStream; + this.contentLength = contentLength; + this.offset = offset; + } + + /** + * @return The input stream this object is reading from + */ + public InputStream getInputStream() { + return inputStream; + } + + /** + * @return The total length of the content that has to be read from this stream + */ + public long getContentLength() { + return contentLength; + } + + /** + * @return The offset pointer in the file that this stream is reading from + */ + public long getOffset() { + return offset; + } +} diff --git a/server/src/main/java/org/opensearch/crypto/CryptoClient.java b/server/src/main/java/org/opensearch/crypto/CryptoClient.java new file mode 100644 index 0000000000000..9a4b34b64756c --- /dev/null +++ b/server/src/main/java/org/opensearch/crypto/CryptoClient.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.crypto; + +import org.opensearch.common.Stream; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.RefCounted; + +import java.io.InputStream; + +/** + * Crypto plugin interface used for encryption and decryption. + */ +public interface CryptoClient extends RefCounted { + + /** + * A factory interface for constructing crypto client. + * + */ + interface Factory { + + /** + * Constructs a crypto client used for encryption and decryption + * + * @param cryptoSettings Settings needed for creating crypto client. + * @param keyProviderName Name of the key provider. + * @return instance of CryptoClient + */ + CryptoClient create(Settings cryptoSettings, String keyProviderName); + } + + /** + * @return key provider type + */ + String type(); + + /** + * @return key provider name + */ + String name(); + + /** + * To Initialise a crypto context used in encryption. This might be needed to set the context before beginning + * encryption. + * + * @return crypto context instance + */ + Object initCryptoContext(); + + /** + * In scenarios where content is divided into multiple parts and streams are emitted against each part, + * it is sometimes required to adjust the size of a part. + * + * @param cryptoContextObj crypto context instance + * @param streamSize Size of the raw stream + * @return Adjusted size of the stream. + */ + long adjustStreamSize(Object cryptoContextObj, long streamSize); + + /** + * Wraps a raw InputStream with encrypting stream + * + * @param cryptoContext created earlier to set the crypto context. + * @param stream Raw InputStream to encrypt + * @return encrypting stream wrapped around raw InputStream. + */ + Stream createEncryptingStream(Object cryptoContext, Stream stream); + + /** + * Provides encrypted stream for a raw stream emitted for a part of content. + * + * @param cryptoContextObj crypto context instance. + * @param stream raw stream for which encrypted stream has to be created. + * @param totalStreams Number of streams being used for the entire content. + * @param streamIdx Index of the current stream. + * @return Encrypted stream for the provided raw stream. + */ + Stream createEncryptingStreamOfPart(Object cryptoContextObj, Stream stream, int totalStreams, int streamIdx); + + /** + * This method accepts an encrypted stream and provides a decrypting wrapper. + * @param encryptingStream to be decrypted. + * @return Decrypting wrapper stream + */ + InputStream createDecryptingStream(InputStream encryptingStream); +} diff --git a/server/src/main/java/org/opensearch/crypto/package-info.java b/server/src/main/java/org/opensearch/crypto/package-info.java new file mode 100644 index 0000000000000..742960ac1cf97 --- /dev/null +++ b/server/src/main/java/org/opensearch/crypto/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for crypto client abstractions and exceptions. + */ +package org.opensearch.crypto; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8cd1a182fbdfe..6e3976c0f1456 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -100,6 +100,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.gateway.WriteStateException; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; @@ -2514,10 +2515,10 @@ public void recoverFromStore(ActionListener listener) { storeRecovery.recoverFromStore(this, listener); } - public void restoreFromRemoteStore(Repository repository, ActionListener listener) { + public void restoreFromRemoteStore(Repository repository, CryptoClient cryptoClient, ActionListener listener) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromRemoteStore(this, repository, listener); + storeRecovery.recoverFromRemoteStore(this, repository, cryptoClient, listener); } public void restoreFromRepository(Repository repository, ActionListener listener) { @@ -3319,13 +3320,25 @@ public void startRecovery( break; case REMOTE_STORE: final Repository remoteTranslogRepo; + final CryptoClient cryptoClient; final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository(); if (remoteTranslogRepoName != null) { remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName); + if (Boolean.TRUE.equals(remoteTranslogRepo.getMetadata().encrypted())) { + cryptoClient = repositoriesService.cryptoClient(remoteTranslogRepo.getMetadata()); + } else { + cryptoClient = null; + } } else { remoteTranslogRepo = null; + cryptoClient = null; } - executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l)); + executeRecovery( + "from remote store", + recoveryState, + recoveryListener, + l -> restoreFromRemoteStore(remoteTranslogRepo, cryptoClient, l) + ); break; case PEER: try { @@ -4456,12 +4469,14 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory : "Store.directory is not enclosing an instance of FilterDirectory"; FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + final Directory remoteStoreDelegate = byteSizeCachingStoreDirectory.getDelegate(); // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that // are uploaded to the remote segment store. - assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; - ((RemoteSegmentStoreDirectory) remoteDirectory).init(); - Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) + assert remoteStoreDelegate instanceof RemoteSegmentStoreDirectory + : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; + RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteStoreDelegate; + remoteDirectory.init(); + Map uploadedSegments = remoteDirectory .getSegmentsUploadedToRemoteStore(); store.incRef(); remoteStore.incRef(); @@ -4490,7 +4505,11 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re if (localSegmentFiles.contains(file)) { storeDirectory.deleteFile(file); } - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) { + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + } else { + remoteDirectory.downloadDataFile(storeDirectory, file, IOContext.DEFAULT); + } downloadedSegments.add(file); if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) { assert segmentInfosSnapshotFilename == null : "There should be only one SegmentInfosSnapshot file"; diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index efd2686b41a20..ffef4b05a5195 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -199,7 +199,7 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s // Visible for testing boolean uploadNewSegments(Collection localFiles) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); - localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { + Collection filteredFiles = localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { try { return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); } catch (IOException e) { @@ -209,15 +209,15 @@ boolean uploadNewSegments(Collection localFiles) throws IOException { ); return true; } - }).forEach(file -> { - try { - remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - } catch (IOException e) { - uploadSuccess.set(false); - // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); - } - }); + }).collect(Collectors.toList()); + + try { + remoteDirectory.copyFilesFrom(storeDirectory, filteredFiles, IOContext.DEFAULT); + } catch (Exception e) { + uploadSuccess.set(false); + // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", e), e); + } return uploadSuccess.get(); } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 31a863129cc8c..d04986b10be98 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -53,6 +53,7 @@ import org.opensearch.common.lucene.Lucene; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.Index; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineException; @@ -118,13 +119,18 @@ void recoverFromStore(final IndexShard indexShard, ActionListener liste } } - void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener listener) { + void recoverFromRemoteStore( + final IndexShard indexShard, + Repository repository, + CryptoClient cryptoClient, + ActionListener listener + ) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType; ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { logger.debug("starting recovery from remote store ..."); - recoverFromRemoteStore(indexShard, repository); + recoverFromRemoteStore(indexShard, repository, cryptoClient); return true; }); } else { @@ -441,7 +447,8 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi }); } - private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException { + private void recoverFromRemoteStore(IndexShard indexShard, Repository repository, CryptoClient cryptoClient) + throws IndexShardRecoveryException { final Store remoteStore = indexShard.remoteStore(); if (remoteStore == null) { throw new IndexShardRecoveryException( @@ -463,7 +470,7 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); } if (repository != null) { - syncTranslogFilesFromRemoteTranslog(indexShard, repository); + syncTranslogFilesFromRemoteTranslog(indexShard, repository, cryptoClient); } else { bootstrap(indexShard, store); } @@ -482,12 +489,14 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository } } - private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException { + private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository, CryptoClient cryptoClient) + throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( blobStoreRepository, + cryptoClient, indexShard.getThreadPool(), shardId, fileTransferTracker diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c385303813844..b369d9725ce5b 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -16,13 +16,19 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.Stream; import org.opensearch.common.UUIDs; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.crypto.CryptoClient; import java.io.IOException; +import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.Collections; @@ -72,6 +78,16 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory { */ private String commonFilenameSuffix; + /** + * To encrypt or decrypt segment data of an encrypted repo before transfer. + */ + private final CryptoClient cryptoClient; + + /** + * Used in transferring segments data to and from remote store. + */ + private final BlobContainer dataBlobContainer; + /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -87,10 +103,17 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory { private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); - public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException { + public RemoteSegmentStoreDirectory( + RemoteDirectory remoteDataDirectory, + RemoteDirectory remoteMetadataDirectory, + CryptoClient cryptoClient, + BlobContainer dataBlobContainer + ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; + this.dataBlobContainer = dataBlobContainer; + this.cryptoClient = cryptoClient; init(); } @@ -317,19 +340,82 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } } - public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { + public void copyFilesFrom(Directory from, Collection files, IOContext context) throws Exception { + for (String src : files) { + uploadFile(from, src, src, context); + } + } + + private void uploadFile(Directory from, String src, String dest, IOContext context) throws IOException { + try ( + IndexInput indexInput = from.openInput(src, context); + InputStream inputStream = new InputStreamIndexInput(indexInput, indexInput.length()) + ) { + String remoteFileName = createRemoteFileName(dest, false); + long contentLength = indexInput.length(); + InputStream transferInputStream = inputStream; + if (cryptoClient != null) { + Object cryptoContext = cryptoClient.initCryptoContext(); + Stream stream = cryptoClient.createEncryptingStream(cryptoContext, new Stream(transferInputStream, indexInput.length(), 0)); + transferInputStream = stream.getInputStream(); + contentLength = stream.getContentLength(); + } + dataBlobContainer.writeBlob(remoteFileName, transferInputStream, contentLength, false); + postUpload(from, src, remoteFileName); + } + } + + private String createRemoteFileName(String dest, boolean useCommonSuffix) { String remoteFilename; if (useCommonSuffix) { remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix; } else { remoteFilename = getNewRemoteSegmentFilename(dest); } - remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + + return remoteFilename; + } + + private void postUpload(Directory from, String src, String remoteFilename) throws IOException { String checksum = getChecksumOfLocalFile(from, src); UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); } + public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { + String remoteFilename = createRemoteFileName(dest, useCommonSuffix); + remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + postUpload(from, src, remoteFilename); + } + + /** + * This method shouldn't be used to download metadata files because it performs decryption if remote directory + * is an encrypted dir. Metadata files are never encrypted. + * @param to Directory to which file needs to be downloaded + * @param file Name by which new file needs to be downloaded in "to" directory. + * @param context File context + * @throws IOException when there is an issue in download. + */ + public void downloadDataFile(Directory to, String file, IOContext context) throws IOException { + + try (IndexOutput indexOutput = to.createOutput(file, context); InputStream inputStream = createRemoteInputStream(file)) { + byte[] buffer = new byte[(int) ByteSizeUnit.KB.toBytes(10)]; + int len; + while ((len = inputStream.read(buffer, 0, buffer.length)) > 0) { + indexOutput.writeBytes(buffer, 0, len); + } + } + } + + private InputStream createRemoteInputStream(String file) throws IOException { + String remoteFilename = getExistingRemoteFilename(file); + InputStream inputStream = dataBlobContainer.readBlob(remoteFilename); + if (cryptoClient != null) { + inputStream = cryptoClient.createDecryptingStream(inputStream); + } + return inputStream; + } + /** * Copies an existing src file from directory from to a non-existent file dest in this directory. * Once the segment is uploaded to remote segment store, update the cache accordingly. diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index cb5548167a577..1d1f06332a8b2 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.Directory; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.IndexStorePlugin; @@ -45,18 +46,22 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw .add(String.valueOf(path.getShardId().getId())) .add("segments"); - RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data"); - RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata"); - - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); + BlobContainer dataBlobContainer = createBlobContainer(repository, commonBlobPath, "data"); + RemoteDirectory dataDirectory = new RemoteDirectory(dataBlobContainer); + BlobContainer metadataBlobContainer = createBlobContainer(repository, commonBlobPath, "metadata"); + RemoteDirectory metadataDirectory = new RemoteDirectory(metadataBlobContainer); + CryptoClient cryptoClient = null; + if (Boolean.TRUE.equals(repository.getMetadata().encrypted())) { + cryptoClient = repositoriesService.get().cryptoClient(repository.getMetadata()); + } + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, cryptoClient, dataBlobContainer); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } } - private RemoteDirectory createRemoteDirectory(Repository repository, BlobPath commonBlobPath, String extention) { + private BlobContainer createBlobContainer(Repository repository, BlobPath commonBlobPath, String extention) { BlobPath extendedPath = commonBlobPath.add(extention); - BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); - return new RemoteDirectory(dataBlobContainer); + return ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index e439a56581c14..05b987240d7bd 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.crypto.CryptoClient; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; @@ -28,6 +29,7 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { private final Repository repository; + private final CryptoClient cryptoClient; private final ThreadPool threadPool; @@ -42,6 +44,11 @@ public RemoteBlobStoreInternalTranslogFactory( } catch (RepositoryMissingException ex) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex); } + if (Boolean.TRUE.equals(repository.getMetadata().encrypted())) { + this.cryptoClient = repositoriesServiceSupplier.get().cryptoClient(repository.getMetadata()); + } else { + this.cryptoClient = null; + } this.repository = repository; this.threadPool = threadPool; } @@ -68,6 +75,7 @@ public Translog newTranslog( persistedSequenceNumberConsumer, blobStoreRepository, threadPool, + cryptoClient, primaryModeSupplier ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 2230f13ad8c61..b35f8ff4170e9 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -14,6 +14,7 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; @@ -47,6 +48,7 @@ public class RemoteFsTranslog extends Translog { private final BlobStoreRepository blobStoreRepository; + private final CryptoClient cryptoClient; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; private final BooleanSupplier primaryModeSupplier; @@ -74,13 +76,21 @@ public RemoteFsTranslog( LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, ThreadPool threadPool, + CryptoClient cryptoClient, BooleanSupplier primaryModeSupplier ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); this.blobStoreRepository = blobStoreRepository; + this.cryptoClient = cryptoClient; this.primaryModeSupplier = primaryModeSupplier; fileTransferTracker = new FileTransferTracker(shardId); - this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker); + this.translogTransferManager = buildTranslogTransferManager( + blobStoreRepository, + cryptoClient, + threadPool, + shardId, + fileTransferTracker + ); try { download(translogTransferManager, location); @@ -143,12 +153,14 @@ public static void download(TranslogTransferManager translogTransferManager, Pat public static TranslogTransferManager buildTranslogTransferManager( BlobStoreRepository blobStoreRepository, + CryptoClient cryptoClient, ThreadPool threadPool, ShardId shardId, FileTransferTracker fileTransferTracker ) { return new TranslogTransferManager( new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), + cryptoClient, blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), fileTransferTracker ); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 88fe816ccb462..3c680bc5a83d6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -13,8 +13,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; +import org.opensearch.common.Stream; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.threadpool.ThreadPool; @@ -44,15 +46,24 @@ public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { public void uploadBlobAsync( String threadpoolName, final TransferFileSnapshot fileSnapshot, + final CryptoClient cryptoClient, Iterable remoteTransferPath, - ActionListener listener + ActionListener listener, + TransferContentType transferContentType ) { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; threadPool.executor(threadpoolName).execute(ActionRunnable.wrap(listener, l -> { try (InputStream inputStream = fileSnapshot.inputStream()) { - blobStore.blobContainer(blobPath) - .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + InputStream finalInputStream = inputStream; + long contentLength = fileSnapshot.getContentLength(); + if (cryptoClient != null && transferContentType == TransferContentType.DATA) { + Object cryptoContext = cryptoClient.initCryptoContext(); + Stream stream = cryptoClient.createEncryptingStream(cryptoContext, new Stream(inputStream, contentLength, 0)); + finalInputStream = stream.getInputStream(); + contentLength = stream.getContentLength(); + } + blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), finalInputStream, contentLength, true); l.onResponse(fileSnapshot); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 239ef7c3c9300..2930c3201c6e5 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -71,6 +71,10 @@ public InputStream inputStream() throws IOException { : new InputStreamIndexInput(new ByteArrayIndexInput(this.name, content), content.length); } + public void reset() throws IOException { + fileChannel.position(0); + } + @Override public int hashCode() { return Objects.hash(name, content, path); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferContentType.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferContentType.java new file mode 100644 index 0000000000000..a2f9d7d29c7bf --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferContentType.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +/** + * Enum to denote the type of data to be transferred. + * Used in cases where depending on data, transfer differs such as metadata is never encrypted + * whereas data is encrypted and transferred. + */ +public enum TransferContentType { + METADATA, + DATA +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 6aca3055a3f53..6b5c8aab95ec0 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog.transfer; import org.opensearch.action.ActionListener; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import java.io.IOException; @@ -27,14 +28,18 @@ public interface TransferService { * Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked * @param threadpoolName threadpool type which will be used to upload blobs asynchronously * @param fileSnapshot the file snapshot to upload + * @param cryptoClient client for encrypting the file content of an encrypted repo before upload. * @param remotePath the remote path where upload should be made * @param listener the callback to be invoked once upload completes successfully/fails + * @param transferContentType type of content to be uploaded. */ void uploadBlobAsync( String threadpoolName, final TransferFileSnapshot fileSnapshot, + final CryptoClient cryptoClient, Iterable remotePath, - ActionListener listener + ActionListener listener, + TransferContentType transferContentType ); /** diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 95536317bcc1b..43d23591b062f 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -16,6 +16,7 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.threadpool.ThreadPool; @@ -51,6 +52,7 @@ public class TranslogTransferManager { private final BlobPath remoteBaseTransferPath; private final BlobPath remoteMetadataTransferPath; private final FileTransferTracker fileTransferTracker; + private final CryptoClient cryptoClient; private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; @@ -60,10 +62,12 @@ public class TranslogTransferManager { public TranslogTransferManager( TransferService transferService, + CryptoClient cryptoClient, BlobPath remoteBaseTransferPath, FileTransferTracker fileTransferTracker ) { this.transferService = transferService; + this.cryptoClient = cryptoClient; this.remoteBaseTransferPath = remoteBaseTransferPath; this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; @@ -102,8 +106,10 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans fileSnapshot -> transferService.uploadBlobAsync( ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, + cryptoClient, remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), - latchedActionListener + latchedActionListener, + TransferContentType.DATA ) ); try { @@ -142,14 +148,15 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca ); // Download Checkpoint file from remote to local FS String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); + downloadToFS(ckpFileName, location, primaryTerm, TransferContentType.DATA); // Download translog file from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); + downloadToFS(translogFilename, location, primaryTerm, TransferContentType.DATA); return true; } - private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { + private void downloadToFS(String fileName, Path location, String primaryTerm, TransferContentType transferContentType) + throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. // We need to change this logic when we introduce incremental download @@ -157,7 +164,11 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th Files.delete(filePath); } try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) { - Files.copy(inputStream, filePath); + InputStream transferInputStream = inputStream; + if (transferContentType == TransferContentType.DATA && cryptoClient != null) { + transferInputStream = cryptoClient.createDecryptingStream(inputStream); + } + Files.copy(transferInputStream, filePath); } // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync fileTransferTracker.add(fileName, true); diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index c438abedfd37c..7fbfc1d0eef4c 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -64,6 +64,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -494,6 +495,16 @@ public Repository repository(String repositoryName) { throw new RepositoryMissingException(repositoryName); } + /** + * Returns registered crypto client + * @param repositoryMetadata repository metadata for which crypto client needs to be returned. + * @return crypto client + */ + public CryptoClient cryptoClient(RepositoryMetadata repositoryMetadata) { + // TODO: Return registered crypto client + return null; + } + public List repositoriesStats() { List archivedRepoStats = repositoriesStatsArchive.getArchivedStats(); List activeRepoStats = getRepositoryStatsForActiveRepositories(); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index a2e1fd0f4570f..6d892d07c5201 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2873,7 +2873,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null)); final PlainActionFuture future = PlainActionFuture.newFuture(); - target.restoreFromRemoteStore(null, future); + target.restoreFromRemoteStore(null, null, future); target.remoteStore().decRef(); assertTrue(future.actionGet()); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 49a2d50dfae06..98fbc60e89bc7 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -19,10 +19,14 @@ import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.Before; +import org.mockito.stubbing.Answer; +import org.opensearch.common.Stream; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.test.OpenSearchTestCase; @@ -35,7 +39,12 @@ import java.util.Map; import java.util.Set; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; + +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; @@ -45,19 +54,21 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doNothing; public class RemoteSegmentStoreDirectoryTests extends OpenSearchTestCase { private RemoteDirectory remoteDataDirectory; private RemoteDirectory remoteMetadataDirectory; private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + private CryptoClient cryptoClient; @Before public void setup() throws IOException { remoteDataDirectory = mock(RemoteDirectory.class); remoteMetadataDirectory = mock(RemoteDirectory.class); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, null, null); } public void testUploadedSegmentMetadataToString() { @@ -364,6 +375,52 @@ public void testCopyFrom() throws IOException { storeDirectory.close(); } + public void testCopyFilesFrom() throws Exception { + BlobContainer dataContainer = mock(BlobContainer.class); + doNothing().when(dataContainer).writeBlob(anyString(), any(), anyLong(), anyBoolean()); + CryptoClient cryptoClient = mock(CryptoClient.class); + when(cryptoClient.initCryptoContext()).thenReturn(mock(Object.class)); + doAnswer((Answer) invocation -> (Stream) invocation.getArgument(1)).when(cryptoClient) + .createEncryptingStream(any(), any(Stream.class)); + + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + cryptoClient, + dataContainer + ); + + List files = new ArrayList<>(); + remoteSegmentStoreDirectory.init(); + populateMetadata(); + Directory storeDirectory = LuceneTestCase.newDirectory(); + + String filename = "_100.si"; + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + files.add(filename); + + filename = "_200.si"; + indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World again!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + files.add(filename); + + storeDirectory.sync(files); + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, files, IOContext.DEFAULT); + + for (String fileName : files) { + assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(fileName)); + } + + verify(cryptoClient, times(files.size())).createEncryptingStream(any(), any(Stream.class)); + storeDirectory.close(); + } + public void testCopyFromException() throws IOException { String filename = "_100.si"; Directory storeDirectory = LuceneTestCase.newDirectory(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 0a6b6a95b74f9..51804e9938315 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -169,6 +169,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin getPersistedSeqNoConsumer(), repository, threadPool, + null, primaryMode::get ); @@ -1264,6 +1265,7 @@ public int write(ByteBuffer src) throws IOException { persistedSeqNos::add, repository, threadPool, + null, () -> Boolean.TRUE ) { @Override diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 196fbd58c2c20..ef5154bcd0755 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -8,12 +8,15 @@ package org.opensearch.index.translog.transfer; +import org.mockito.stubbing.Answer; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.Stream; +import org.opensearch.crypto.CryptoClient; import org.opensearch.env.Environment; import org.opensearch.env.TestEnvironment; import org.opensearch.indices.recovery.RecoverySettings; @@ -33,6 +36,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class BlobStoreTransferServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; @@ -74,6 +84,43 @@ public void testUploadBlobAsync() throws IOException, InterruptedException { transferService.uploadBlobAsync( ThreadPool.Names.TRANSLOG_TRANSFER, transferFileSnapshot, + null, + repository.basePath(), + new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + assert succeeded.compareAndSet(false, true); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshot.getPrimaryTerm()); + assertEquals(transferFileSnapshot.getName(), fileSnapshot.getName()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Failed to perform uploadBlobAsync", e); + } + }, latch), + TransferContentType.DATA + ); + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(succeeded.get()); + } + + public void testUploadBlobAsyncWithEncryptionEnabled() throws IOException, InterruptedException { + BlobStoreRepository repository = createRepository(true); + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + AtomicBoolean succeeded = new AtomicBoolean(false); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + CountDownLatch latch = new CountDownLatch(1); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + CryptoClient cryptoClient = mock(CryptoClient.class); + when(cryptoClient.initCryptoContext()).thenReturn(mock(Object.class)); + doAnswer((Answer) invocation -> (Stream) invocation.getArgument(1)).when(cryptoClient) + .createEncryptingStream(any(), any(Stream.class)); + transferService.uploadBlobAsync( + ThreadPool.Names.TRANSLOG_TRANSFER, + transferFileSnapshot, + cryptoClient, repository.basePath(), new LatchedActionListener<>(new ActionListener<>() { @Override @@ -87,10 +134,12 @@ public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { public void onFailure(Exception e) { throw new AssertionError("Failed to perform uploadBlobAsync", e); } - }, latch) + }, latch), + TransferContentType.DATA ); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); assertTrue(succeeded.get()); + verify(cryptoClient, times(1)).createEncryptingStream(any(), any(Stream.class)); } @Override @@ -100,10 +149,14 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } - /** Create a {@link Repository} with a random name **/ private BlobStoreRepository createRepository() { + return createRepository(null); + } + + /** Create a {@link Repository} with a random name **/ + private BlobStoreRepository createRepository(Boolean encrypted) { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); - RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings, encrypted); final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); final FsRepository repository = new FsRepository( repositoryMetadata, diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 0a21be99bc06b..18974a12e3760 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -10,11 +10,13 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.util.set.Sets; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; @@ -27,6 +29,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -35,11 +38,12 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import static org.mockito.Mockito.any; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -84,7 +88,14 @@ public void testTransferSnapshot() throws IOException { listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[1]); return null; }).when(transferService) - .uploadBlobAsync(any(String.class), any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + .uploadBlobAsync( + any(String.class), + any(TransferFileSnapshot.class), + any(CryptoClient.class), + any(BlobPath.class), + any(ActionListener.class), + any(TransferContentType.class) + ); FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { @Override @@ -103,6 +114,7 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { TranslogTransferManager translogTransferManager = new TranslogTransferManager( transferService, + null, remoteBaseTransferPath, fileTransferTracker ); @@ -177,14 +189,14 @@ public TranslogTransferMetadata getTranslogTransferMetadata() { } public void testReadMetadataNoFile() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, null, remoteBaseTransferPath, null); when(transferService.listAll(remoteBaseTransferPath)).thenReturn(Sets.newHashSet()); assertNull(translogTransferManager.readMetadata()); } public void testReadMetadataSingleFile() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, null, remoteBaseTransferPath, null); // BlobPath does not have equals method, so we can't use the instance directly in when when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234")); @@ -198,7 +210,7 @@ public void testReadMetadataSingleFile() throws IOException { } public void testReadMetadataMultipleFiles() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, null, remoteBaseTransferPath, null); when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234", "12__235", "12__233")); @@ -211,7 +223,7 @@ public void testReadMetadataMultipleFiles() throws IOException { } public void testReadMetadataException() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, null, remoteBaseTransferPath, null); when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234", "12__235", "12__233")); @@ -229,6 +241,7 @@ public void testDownloadTranslog() throws IOException { Path location = createTempDir(); TranslogTransferManager translogTransferManager = new TranslogTransferManager( transferService, + null, remoteBaseTransferPath, new FileTransferTracker(new ShardId("index", "indexUuid", 0)) ); @@ -248,13 +261,46 @@ public void testDownloadTranslog() throws IOException { assertTrue(Files.exists(location.resolve("translog-23.ckp"))); } + public void testDownloadAndDecryptTranslog() throws IOException { + Path location = createTempDir(); + CryptoClient cryptoClient = mock(CryptoClient.class); + doAnswer((Answer) invocation -> (InputStream) invocation.getArgument(0)).when(cryptoClient) + .createDecryptingStream(any(InputStream.class)); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + cryptoClient, + remoteBaseTransferPath, + new FileTransferTracker(new ShardId("index", "indexUuid", 0)) + ); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( + new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) + ); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.ckp"))).thenReturn( + new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) + ); + + assertFalse(Files.exists(location.resolve("translog-23.tlog"))); + assertFalse(Files.exists(location.resolve("translog-23.ckp"))); + translogTransferManager.downloadTranslog("12", "23", location); + assertTrue(Files.exists(location.resolve("translog-23.tlog"))); + assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + verify(cryptoClient, times(2)).createDecryptingStream(any()); + } + public void testDownloadTranslogAlreadyExists() throws IOException { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); Path location = createTempDir(); Files.createFile(location.resolve("translog-23.tlog")); Files.createFile(location.resolve("translog-23.ckp")); - TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, tracker); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + null, + remoteBaseTransferPath, + tracker + ); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) @@ -278,7 +324,12 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { Files.createFile(location.resolve(translogFile)); Files.createFile(location.resolve(checkpointFile)); - TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, tracker); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + null, + remoteBaseTransferPath, + tracker + ); when(transferService.downloadBlob(any(BlobPath.class), eq(translogFile))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) @@ -311,6 +362,7 @@ public void testDeleteTranslogSuccess() throws Exception { BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); TranslogTransferManager translogTransferManager = new TranslogTransferManager( blobStoreTransferService, + null, remoteBaseTransferPath, tracker ); @@ -334,6 +386,7 @@ public void testDeleteTranslogFailure() throws Exception { BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); TranslogTransferManager translogTransferManager = new TranslogTransferManager( blobStoreTransferService, + null, remoteBaseTransferPath, tracker ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index bb3b016560fa7..79b3de530276e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -633,7 +633,12 @@ protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMet ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + dataDirectory, + metadataDirectory, + null, + null + ); return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); }