From e866f043dc1af7121175cabfea69c15ea2efba53 Mon Sep 17 00:00:00 2001 From: Vikas Bansal Date: Mon, 1 May 2023 18:39:04 +0530 Subject: [PATCH] crypto plugin integration changes Signed-off-by: Vikas Bansal --- .../opensearch/index/shard/IndexShardIT.java | 15 ++- .../cluster/metadata/RepositoryMetadata.java | 28 +++++- .../java/org/opensearch/common/Stream.java | 55 +++++++++++ .../org/opensearch/crypto/CryptoClient.java | 92 +++++++++++++++++++ .../org/opensearch/crypto/package-info.java | 12 +++ .../org/opensearch/index/IndexModule.java | 5 +- .../org/opensearch/index/IndexService.java | 7 +- .../opensearch/index/shard/IndexShard.java | 29 +++--- .../shard/RemoteStoreRefreshListener.java | 20 ++-- .../store/RemoteSegmentStoreDirectory.java | 81 +++++++++++++++- .../RemoteSegmentStoreDirectoryFactory.java | 7 +- ...emoteBlobStoreInternalTranslogFactory.java | 10 +- .../index/translog/RemoteFsTranslog.java | 20 +++- .../transfer/BlobStoreTransferService.java | 17 +++- .../index/translog/transfer/FileSnapshot.java | 4 + .../transfer/TransferContentType.java | 19 ++++ .../translog/transfer/TransferService.java | 7 +- .../transfer/TranslogTransferManager.java | 21 ++++- .../opensearch/indices/IndicesService.java | 54 ++++++++--- .../repositories/RepositoriesService.java | 11 +++ .../opensearch/index/IndexModuleTests.java | 32 +++++-- .../RemoteSegmentStoreDirectoryTests.java | 66 ++++++++++++- .../index/translog/RemoteFSTranslogTests.java | 6 +- .../BlobStoreTransferServiceTests.java | 59 +++++++++++- .../TranslogTransferManagerTests.java | 60 +++++++++++- .../index/shard/IndexShardTestCase.java | 41 +++++++-- 26 files changed, 686 insertions(+), 92 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/Stream.java create mode 100644 server/src/main/java/org/opensearch/crypto/CryptoClient.java create mode 100644 server/src/main/java/org/opensearch/crypto/package-info.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TransferContentType.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 11f187ac6e619..158f9fb0bae6a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -45,6 +45,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; @@ -63,6 +64,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.ShardLock; @@ -81,6 +83,7 @@ import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.index.translog.TranslogStats; import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; @@ -699,7 +702,17 @@ public static final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - (indexSettings, shardRouting) -> new InternalTranslogFactory(), + new IndicesService.TranslogFactorySupplier() { + @Override + public TranslogFactory createTranslogFactory(IndexSettings indexSettings, ShardRouting shardRouting) { + return new InternalTranslogFactory(); + } + + @Override + public CryptoClient createCryptoClient(RepositoryMetadata repositoryMetadata) { + return null; + } + }, SegmentReplicationCheckpointPublisher.EMPTY, null ); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/RepositoryMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/RepositoryMetadata.java index db4e5d8137a20..2d22498bfdabf 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/RepositoryMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/RepositoryMetadata.java @@ -61,6 +61,11 @@ public class RepositoryMetadata implements Writeable { */ private final long pendingGeneration; + /** + * Whether repository is encrypted + */ + private final Boolean encrypted; + /** * Constructs new repository metadata * @@ -69,14 +74,22 @@ public class RepositoryMetadata implements Writeable { * @param settings repository settings */ public RepositoryMetadata(String name, String type, Settings settings) { - this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN); + this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN, null); + } + + public RepositoryMetadata(String name, String type, Settings settings, Boolean encrypted) { + this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN, encrypted); } public RepositoryMetadata(RepositoryMetadata metadata, long generation, long pendingGeneration) { - this(metadata.name, metadata.type, metadata.settings, generation, pendingGeneration); + this(metadata.name, metadata.type, metadata.settings, generation, pendingGeneration, null); } public RepositoryMetadata(String name, String type, Settings settings, long generation, long pendingGeneration) { + this(name, type, settings, generation, pendingGeneration, null); + } + + public RepositoryMetadata(String name, String type, Settings settings, long generation, long pendingGeneration, Boolean encrypted) { this.name = name; this.type = type; this.settings = settings; @@ -87,6 +100,7 @@ public RepositoryMetadata(String name, String type, Settings settings, long gene + "] must be greater or equal to generation [" + generation + "]"; + this.encrypted = encrypted; } /** @@ -116,6 +130,15 @@ public Settings settings() { return this.settings; } + /** + * Returns whether repository is encrypted + * + * @return whether repository is encrypted + */ + public Boolean encrypted() { + return null; + } + /** * Returns the safe repository generation. {@link RepositoryData} for this generation is assumed to exist in the repository. * All operations on the repository must be based on the {@link RepositoryData} at this generation. @@ -146,6 +169,7 @@ public RepositoryMetadata(StreamInput in) throws IOException { settings = Settings.readSettingsFromStream(in); generation = in.readLong(); pendingGeneration = in.readLong(); + encrypted = null; } /** diff --git a/server/src/main/java/org/opensearch/common/Stream.java b/server/src/main/java/org/opensearch/common/Stream.java new file mode 100644 index 0000000000000..ab5666f5819fc --- /dev/null +++ b/server/src/main/java/org/opensearch/common/Stream.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import java.io.InputStream; + +/** + * Model composed of an input stream, the total content length and offset + */ +public class Stream { + + private final InputStream inputStream; + private final long contentLength; + private final long offset; + + /** + * Construct a new stream object + * + * @param inputStream The input stream that is to be encapsulated + * @param contentLength The total content length that is to be read from the stream + * @param offset The offset pointer that this stream reads from in the file + */ + public Stream(InputStream inputStream, long contentLength, long offset) { + this.inputStream = inputStream; + this.contentLength = contentLength; + this.offset = offset; + } + + /** + * @return The input stream this object is reading from + */ + public InputStream getInputStream() { + return inputStream; + } + + /** + * @return The total length of the content that has to be read from this stream + */ + public long getContentLength() { + return contentLength; + } + + /** + * @return The offset pointer in the file that this stream is reading from + */ + public long getOffset() { + return offset; + } +} diff --git a/server/src/main/java/org/opensearch/crypto/CryptoClient.java b/server/src/main/java/org/opensearch/crypto/CryptoClient.java new file mode 100644 index 0000000000000..9a4b34b64756c --- /dev/null +++ b/server/src/main/java/org/opensearch/crypto/CryptoClient.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.crypto; + +import org.opensearch.common.Stream; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.RefCounted; + +import java.io.InputStream; + +/** + * Crypto plugin interface used for encryption and decryption. + */ +public interface CryptoClient extends RefCounted { + + /** + * A factory interface for constructing crypto client. + * + */ + interface Factory { + + /** + * Constructs a crypto client used for encryption and decryption + * + * @param cryptoSettings Settings needed for creating crypto client. + * @param keyProviderName Name of the key provider. + * @return instance of CryptoClient + */ + CryptoClient create(Settings cryptoSettings, String keyProviderName); + } + + /** + * @return key provider type + */ + String type(); + + /** + * @return key provider name + */ + String name(); + + /** + * To Initialise a crypto context used in encryption. This might be needed to set the context before beginning + * encryption. + * + * @return crypto context instance + */ + Object initCryptoContext(); + + /** + * In scenarios where content is divided into multiple parts and streams are emitted against each part, + * it is sometimes required to adjust the size of a part. + * + * @param cryptoContextObj crypto context instance + * @param streamSize Size of the raw stream + * @return Adjusted size of the stream. + */ + long adjustStreamSize(Object cryptoContextObj, long streamSize); + + /** + * Wraps a raw InputStream with encrypting stream + * + * @param cryptoContext created earlier to set the crypto context. + * @param stream Raw InputStream to encrypt + * @return encrypting stream wrapped around raw InputStream. + */ + Stream createEncryptingStream(Object cryptoContext, Stream stream); + + /** + * Provides encrypted stream for a raw stream emitted for a part of content. + * + * @param cryptoContextObj crypto context instance. + * @param stream raw stream for which encrypted stream has to be created. + * @param totalStreams Number of streams being used for the entire content. + * @param streamIdx Index of the current stream. + * @return Encrypted stream for the provided raw stream. + */ + Stream createEncryptingStreamOfPart(Object cryptoContextObj, Stream stream, int totalStreams, int streamIdx); + + /** + * This method accepts an encrypted stream and provides a decrypting wrapper. + * @param encryptingStream to be decrypted. + * @return Decrypting wrapper stream + */ + InputStream createDecryptingStream(InputStream encryptingStream); +} diff --git a/server/src/main/java/org/opensearch/crypto/package-info.java b/server/src/main/java/org/opensearch/crypto/package-info.java new file mode 100644 index 0000000000000..742960ac1cf97 --- /dev/null +++ b/server/src/main/java/org/opensearch/crypto/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for crypto client abstractions and exceptions. + */ +package org.opensearch.crypto; diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index bdb043b7b9aa1..0cd9829538591 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -43,7 +43,6 @@ import org.opensearch.Version; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; import org.opensearch.common.SetOnce; @@ -73,8 +72,8 @@ import org.opensearch.index.store.FsDirectoryFactory; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; -import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.IndicesQueryCache; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; @@ -504,7 +503,7 @@ public IndexService newIndexService( BooleanSupplier idFieldDataEnabled, ValuesSourceRegistry valuesSourceRegistry, IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, - BiFunction translogFactorySupplier + IndicesService.TranslogFactorySupplier translogFactorySupplier ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index a77ea53d7560c..242394dc26127 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -89,7 +89,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.TranslogFactory; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -114,7 +114,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -174,7 +173,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final IndexNameExpressionResolver expressionResolver; private final Supplier indexSortSupplier; private final ValuesSourceRegistry valuesSourceRegistry; - private final BiFunction translogFactorySupplier; + private final IndicesService.TranslogFactorySupplier translogFactorySupplier; public IndexService( IndexSettings indexSettings, @@ -207,7 +206,7 @@ public IndexService( IndexNameExpressionResolver expressionResolver, ValuesSourceRegistry valuesSourceRegistry, IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, - BiFunction translogFactorySupplier + IndicesService.TranslogFactorySupplier translogFactorySupplier ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a6225569d86b4..d78c5fd34cd34 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -101,6 +101,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.gateway.WriteStateException; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; @@ -211,7 +212,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -327,7 +327,7 @@ Runnable getGlobalCheckpointSyncer() { private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; private final Store remoteStore; - private final BiFunction translogFactorySupplier; + private final IndicesService.TranslogFactorySupplier translogFactorySupplier; private final boolean isTimeSeriesIndex; @@ -352,7 +352,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final BiFunction translogFactorySupplier, + final IndicesService.TranslogFactorySupplier translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore ) throws IOException { @@ -3591,7 +3591,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro tombstoneDocSupplier(), isReadOnlyReplica, replicationTracker::isPrimaryMode, - translogFactorySupplier.apply(indexSettings, shardRouting), + translogFactorySupplier.createTranslogFactory(indexSettings, shardRouting), isTimeSeriesIndex ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for timeseries ); } @@ -4453,10 +4453,11 @@ private void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException { } public void syncTranslogFilesFromRemoteTranslog() throws IOException { - TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); + TranslogFactory translogFactory = translogFactorySupplier.createTranslogFactory(indexSettings, shardRouting); assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository(); - RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog()); + CryptoClient cryptoClient = translogFactorySupplier.createCryptoClient(repository.getMetadata()); + RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), cryptoClient); } /** @@ -4483,12 +4484,14 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory : "Store.directory is not enclosing an instance of FilterDirectory"; FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + final Directory remoteStoreDelegate = byteSizeCachingStoreDirectory.getDelegate(); // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that // are uploaded to the remote segment store. - assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; - ((RemoteSegmentStoreDirectory) remoteDirectory).init(); - Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) + assert remoteStoreDelegate instanceof RemoteSegmentStoreDirectory + : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; + RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteStoreDelegate; + remoteDirectory.init(); + Map uploadedSegments = remoteDirectory .getSegmentsUploadedToRemoteStore(); store.incRef(); remoteStore.incRef(); @@ -4517,7 +4520,11 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re if (localSegmentFiles.contains(file)) { storeDirectory.deleteFile(file); } - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) { + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + } else { + remoteDirectory.downloadDataFile(storeDirectory, file, IOContext.DEFAULT); + } downloadedSegments.add(file); if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) { assert segmentInfosSnapshotFilename == null : "There should be only one SegmentInfosSnapshot file"; diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index ac9c35aaee6b5..40618fa135c9d 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -306,7 +306,7 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s // Visible for testing boolean uploadNewSegments(Collection localFiles) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); - localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { + Collection filteredFiles = localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { try { return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); } catch (IOException e) { @@ -316,15 +316,15 @@ boolean uploadNewSegments(Collection localFiles) throws IOException { ); return true; } - }).forEach(file -> { - try { - remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - } catch (IOException e) { - uploadSuccess.set(false); - // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); - } - }); + }).collect(Collectors.toList()); + + try { + remoteDirectory.copyFilesFrom(storeDirectory, filteredFiles, IOContext.DEFAULT); + } catch (Exception e) { + uploadSuccess.set(false); + // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", e), e); + } return uploadSuccess.get(); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 9f41ac6f7fd17..652582fa384cf 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -16,6 +16,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.Stream; import org.opensearch.common.UUIDs; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; @@ -24,8 +25,12 @@ import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.crypto.CryptoClient; import java.io.IOException; +import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.Map; import java.util.HashSet; @@ -77,6 +82,11 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ private String commonFilenameSuffix; + /** + * To encrypt or decrypt segment data of an encrypted repo before transfer. + */ + private final CryptoClient cryptoClient; + /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -95,12 +105,14 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, - RemoteStoreLockManager mdLockManager + RemoteStoreLockManager mdLockManager, + CryptoClient cryptoClient ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; + this.cryptoClient = cryptoClient; init(); } @@ -392,21 +404,80 @@ String getMetadataFileForCommit(long primaryTerm, long generation) throws IOExce return metadataFiles.iterator().next(); } - public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix, String checksum) - throws IOException { + public void copyFilesFrom(Directory from, Collection files, IOContext context) throws Exception { + for (String src : files) { + uploadFile(from, src, src, context); + } + } + + private void uploadFile(Directory from, String src, String dest, IOContext context) throws IOException { + try ( + IndexInput indexInput = from.openInput(src, context); + InputStream inputStream = new InputStreamIndexInput(indexInput, indexInput.length()) + ) { + String remoteFileName = createRemoteFileName(dest, false); + long contentLength = indexInput.length(); + InputStream transferInputStream = inputStream; + if (cryptoClient != null) { + Object cryptoContext = cryptoClient.initCryptoContext(); + Stream stream = cryptoClient.createEncryptingStream(cryptoContext, new Stream(transferInputStream, indexInput.length(), 0)); + transferInputStream = stream.getInputStream(); + contentLength = stream.getContentLength(); + } + remoteDataDirectory.blobContainer.writeBlob(remoteFileName, transferInputStream, contentLength, false); + postUpload(from, src, remoteFileName); + } + } + + private String createRemoteFileName(String dest, boolean useCommonSuffix) { String remoteFilename; if (useCommonSuffix) { remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix; } else { remoteFilename = getNewRemoteSegmentFilename(dest); } - remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + + return remoteFilename; + } + + private void postUpload(Directory from, String src, String remoteFilename) throws IOException { + String checksum = getChecksumOfLocalFile(from, src); UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); } public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { - copyFrom(from, src, dest, context, useCommonSuffix, getChecksumOfLocalFile(from, src)); + String remoteFilename = createRemoteFileName(dest, useCommonSuffix); + remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + postUpload(from, src, remoteFilename); + } + + /** + * This method shouldn't be used to download metadata files because it performs decryption if remote directory + * is an encrypted dir. Metadata files are never encrypted. + * @param to Directory to which file needs to be downloaded + * @param file Name by which new file needs to be downloaded in "to" directory. + * @param context File context + * @throws IOException when there is an issue in download. + */ + public void downloadDataFile(Directory to, String file, IOContext context) throws IOException { + + try (IndexOutput indexOutput = to.createOutput(file, context); InputStream inputStream = createRemoteInputStream(file)) { + byte[] buffer = new byte[(int) ByteSizeUnit.KB.toBytes(10)]; + int len; + while ((len = inputStream.read(buffer, 0, buffer.length)) > 0) { + indexOutput.writeBytes(buffer, 0, len); + } + } + } + + private InputStream createRemoteInputStream(String file) throws IOException { + String remoteFilename = getExistingRemoteFilename(file); + InputStream inputStream = remoteDataDirectory.blobContainer.readBlob(remoteFilename); + if (cryptoClient != null) { + inputStream = cryptoClient.createDecryptingStream(inputStream); + } + return inputStream; } /** diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 388f80ea3e480..090457bb02f6f 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.Directory; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; @@ -56,8 +57,12 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw indexUUID, shardId ); + CryptoClient cryptoClient = null; + if (repository.getMetadata() != null && Boolean.TRUE.equals(repository.getMetadata().encrypted())) { + cryptoClient = repositoriesService.get().cryptoClient(repository.getMetadata()); + } - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, cryptoClient); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 339e16db6f360..ec22767cf63d3 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.crypto.CryptoClient; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; @@ -28,6 +29,7 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { private final Repository repository; + private final CryptoClient cryptoClient; private final ThreadPool threadPool; @@ -42,6 +44,11 @@ public RemoteBlobStoreInternalTranslogFactory( } catch (RepositoryMissingException ex) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex); } + if (repository.getMetadata() != null && Boolean.TRUE.equals(repository.getMetadata().encrypted())) { + this.cryptoClient = repositoriesServiceSupplier.get().cryptoClient(repository.getMetadata()); + } else { + this.cryptoClient = null; + } this.repository = repository; this.threadPool = threadPool; } @@ -68,7 +75,8 @@ public Translog newTranslog( persistedSequenceNumberConsumer, blobStoreRepository, threadPool, - primaryModeSupplier + primaryModeSupplier, + cryptoClient ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index e2a770fd8322e..b239164081099 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -16,6 +16,7 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; @@ -51,6 +52,7 @@ public class RemoteFsTranslog extends Translog { private static final Logger logger = LogManager.getLogger(RemoteFsTranslog.class); private final BlobStoreRepository blobStoreRepository; + private final CryptoClient cryptoClient; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; private final BooleanSupplier primaryModeSupplier; @@ -78,13 +80,21 @@ public RemoteFsTranslog( LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, ThreadPool threadPool, - BooleanSupplier primaryModeSupplier + BooleanSupplier primaryModeSupplier, + CryptoClient cryptoClient ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); this.blobStoreRepository = blobStoreRepository; + this.cryptoClient = cryptoClient; this.primaryModeSupplier = primaryModeSupplier; fileTransferTracker = new FileTransferTracker(shardId); - this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker); + this.translogTransferManager = buildTranslogTransferManager( + blobStoreRepository, + cryptoClient, + threadPool, + shardId, + fileTransferTracker + ); try { download(translogTransferManager, location); Checkpoint checkpoint = readCheckpoint(location); @@ -119,12 +129,14 @@ public RemoteFsTranslog( } } - public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException { + public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location, CryptoClient cryptoClient) + throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); TranslogTransferManager translogTransferManager = buildTranslogTransferManager( blobStoreRepository, + cryptoClient, threadPool, shardId, fileTransferTracker @@ -160,6 +172,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat public static TranslogTransferManager buildTranslogTransferManager( BlobStoreRepository blobStoreRepository, + CryptoClient cryptoClient, ThreadPool threadPool, ShardId shardId, FileTransferTracker fileTransferTracker @@ -167,6 +180,7 @@ public static TranslogTransferManager buildTranslogTransferManager( return new TranslogTransferManager( shardId, new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), + cryptoClient, blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), fileTransferTracker ); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 88fe816ccb462..3c680bc5a83d6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -13,8 +13,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; +import org.opensearch.common.Stream; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.threadpool.ThreadPool; @@ -44,15 +46,24 @@ public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { public void uploadBlobAsync( String threadpoolName, final TransferFileSnapshot fileSnapshot, + final CryptoClient cryptoClient, Iterable remoteTransferPath, - ActionListener listener + ActionListener listener, + TransferContentType transferContentType ) { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; threadPool.executor(threadpoolName).execute(ActionRunnable.wrap(listener, l -> { try (InputStream inputStream = fileSnapshot.inputStream()) { - blobStore.blobContainer(blobPath) - .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + InputStream finalInputStream = inputStream; + long contentLength = fileSnapshot.getContentLength(); + if (cryptoClient != null && transferContentType == TransferContentType.DATA) { + Object cryptoContext = cryptoClient.initCryptoContext(); + Stream stream = cryptoClient.createEncryptingStream(cryptoContext, new Stream(inputStream, contentLength, 0)); + finalInputStream = stream.getInputStream(); + contentLength = stream.getContentLength(); + } + blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), finalInputStream, contentLength, true); l.onResponse(fileSnapshot); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 239ef7c3c9300..2930c3201c6e5 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -71,6 +71,10 @@ public InputStream inputStream() throws IOException { : new InputStreamIndexInput(new ByteArrayIndexInput(this.name, content), content.length); } + public void reset() throws IOException { + fileChannel.position(0); + } + @Override public int hashCode() { return Objects.hash(name, content, path); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferContentType.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferContentType.java new file mode 100644 index 0000000000000..a2f9d7d29c7bf --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferContentType.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +/** + * Enum to denote the type of data to be transferred. + * Used in cases where depending on data, transfer differs such as metadata is never encrypted + * whereas data is encrypted and transferred. + */ +public enum TransferContentType { + METADATA, + DATA +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 6aca3055a3f53..6b5c8aab95ec0 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog.transfer; import org.opensearch.action.ActionListener; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import java.io.IOException; @@ -27,14 +28,18 @@ public interface TransferService { * Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked * @param threadpoolName threadpool type which will be used to upload blobs asynchronously * @param fileSnapshot the file snapshot to upload + * @param cryptoClient client for encrypting the file content of an encrypted repo before upload. * @param remotePath the remote path where upload should be made * @param listener the callback to be invoked once upload completes successfully/fails + * @param transferContentType type of content to be uploaded. */ void uploadBlobAsync( String threadpoolName, final TransferFileSnapshot fileSnapshot, + final CryptoClient cryptoClient, Iterable remotePath, - ActionListener listener + ActionListener listener, + TransferContentType transferContentType ); /** diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 243fd8801a562..079ad7fff93fb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -17,6 +17,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.shard.ShardId; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.threadpool.ThreadPool; @@ -53,6 +54,7 @@ public class TranslogTransferManager { private final BlobPath remoteBaseTransferPath; private final BlobPath remoteMetadataTransferPath; private final FileTransferTracker fileTransferTracker; + private final CryptoClient cryptoClient; private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; @@ -63,11 +65,13 @@ public class TranslogTransferManager { public TranslogTransferManager( ShardId shardId, TransferService transferService, + CryptoClient cryptoClient, BlobPath remoteBaseTransferPath, FileTransferTracker fileTransferTracker ) { this.shardId = shardId; this.transferService = transferService; + this.cryptoClient = cryptoClient; this.remoteBaseTransferPath = remoteBaseTransferPath; this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; @@ -110,8 +114,10 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans fileSnapshot -> transferService.uploadBlobAsync( ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, + cryptoClient, remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), - latchedActionListener + latchedActionListener, + TransferContentType.DATA ) ); try { @@ -150,14 +156,15 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca ); // Download Checkpoint file from remote to local FS String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); + downloadToFS(ckpFileName, location, primaryTerm, TransferContentType.DATA); // Download translog file from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); + downloadToFS(translogFilename, location, primaryTerm, TransferContentType.DATA); return true; } - private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { + private void downloadToFS(String fileName, Path location, String primaryTerm, TransferContentType transferContentType) + throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. // We need to change this logic when we introduce incremental download @@ -165,7 +172,11 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th Files.delete(filePath); } try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) { - Files.copy(inputStream, filePath); + InputStream transferInputStream = inputStream; + if (transferContentType == TransferContentType.DATA && cryptoClient != null) { + transferInputStream = cryptoClient.createDecryptingStream(inputStream); + } + Files.copy(transferInputStream, filePath); } // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync fileTransferTracker.add(fileName, true); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b3843dfd114a9..27285fee0b972 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -52,6 +52,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; @@ -90,6 +91,7 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.ShardLock; import org.opensearch.env.ShardLockObtainFailedException; @@ -181,7 +183,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -328,7 +329,7 @@ public class IndicesService extends AbstractLifecycleComponent private final boolean nodeWriteDanglingIndicesInfo; private final ValuesSourceRegistry valuesSourceRegistry; private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory; - private final BiFunction translogFactorySupplier; + private final TranslogFactorySupplier translogFactorySupplier; private final FileCacheCleaner fileCacheCleaner; @@ -455,22 +456,53 @@ protected void closeInternal() { this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool); } - private static BiFunction getTranslogFactorySupplier( + private static TranslogFactorySupplier getTranslogFactorySupplier( Supplier repositoriesServiceSupplier, ThreadPool threadPool ) { - return (indexSettings, shardRouting) -> { - if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { - return new RemoteBlobStoreInternalTranslogFactory( - repositoriesServiceSupplier, - threadPool, - indexSettings.getRemoteStoreTranslogRepository() - ); + return new TranslogFactorySupplier() { + @Override + public TranslogFactory createTranslogFactory(IndexSettings indexSettings, ShardRouting shardRouting) { + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + return new RemoteBlobStoreInternalTranslogFactory( + repositoriesServiceSupplier, + threadPool, + indexSettings.getRemoteStoreTranslogRepository() + ); + } + return new InternalTranslogFactory(); + } + + @Override + public CryptoClient createCryptoClient(RepositoryMetadata repositoryMetadata) { + if (Boolean.TRUE.equals(repositoryMetadata.encrypted())) { + return repositoriesServiceSupplier.get().cryptoClient(repositoryMetadata); + } + return null; } - return new InternalTranslogFactory(); }; } + /** + * Factory supplier to provide translog factory and other clients used during translog creation + */ + public interface TranslogFactorySupplier { + /** + * Translog factory supplier for translog creation + * @param indexSettings Required to determine type of repository. + * @param shardRouting To determine shard type + * @return Translog Factory instance + */ + TranslogFactory createTranslogFactory(IndexSettings indexSettings, ShardRouting shardRouting); + + /** + * Create crypto client for a repository + * @param repositoryMetadata Metadata of repository for which crypto client needs to be created. + * @return Crypto client instance + */ + CryptoClient createCryptoClient(RepositoryMetadata repositoryMetadata); + } + private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; public ClusterService clusterService() { diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index da696b41328df..4b5d4401ecbc5 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -64,6 +64,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -494,6 +495,16 @@ public Repository repository(String repositoryName) { throw new RepositoryMissingException(repositoryName); } + /** + * Returns registered crypto client + * @param repositoryMetadata repository metadata for which crypto client needs to be returned. + * @return crypto client + */ + public CryptoClient cryptoClient(RepositoryMetadata repositoryMetadata) { + // TODO: Return registered crypto client + return null; + } + public List repositoriesStats() { List archivedRepoStats = repositoriesStatsArchive.getArchivedStats(); List activeRepoStats = getRepositoryStatsForActiveRepositories(); diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index d9d87196ca289..0c225d4fa7fd0 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -47,6 +47,7 @@ import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; @@ -66,6 +67,7 @@ import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.ShardLock; @@ -97,6 +99,7 @@ import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesQueryCache; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.analysis.AnalysisModule; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; @@ -122,10 +125,10 @@ import java.util.Collections; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiFunction; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -225,15 +228,26 @@ public void tearDown() throws Exception { private IndexService newIndexService(IndexModule module) throws IOException { final SetOnce repositoriesServiceReference = new SetOnce<>(); repositoriesServiceReference.set(repositoriesService); - BiFunction translogFactorySupplier = (indexSettings, shardRouting) -> { - if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { - return new RemoteBlobStoreInternalTranslogFactory( - repositoriesServiceReference::get, - threadPool, - indexSettings.getRemoteStoreTranslogRepository() - ); + IndicesService.TranslogFactorySupplier translogFactorySupplier = new IndicesService.TranslogFactorySupplier() { + @Override + public TranslogFactory createTranslogFactory(IndexSettings indexSettings, ShardRouting shardRouting) { + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + return new RemoteBlobStoreInternalTranslogFactory( + repositoriesServiceReference::get, + threadPool, + indexSettings.getRemoteStoreTranslogRepository() + ); + } + return new InternalTranslogFactory(); + } + + @Override + public CryptoClient createCryptoClient(RepositoryMetadata repositoryMetadata) { + if (Boolean.TRUE.equals(repositoryMetadata.encrypted())) { + return Objects.requireNonNull(repositoriesServiceReference.get()).cryptoClient(repositoryMetadata); + } + return null; } - return new InternalTranslogFactory(); }; return module.newIndexService( CREATE_INDEX, diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 10295ffc56812..cdca236c8ca93 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -19,11 +19,16 @@ import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.Before; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.opensearch.common.Stream; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.test.OpenSearchTestCase; @@ -36,6 +41,11 @@ import java.util.HashMap; import java.util.Collection; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; @@ -56,11 +66,13 @@ public class RemoteSegmentStoreDirectoryTests extends OpenSearchTestCase { @Before public void setup() throws IOException { - remoteDataDirectory = mock(RemoteDirectory.class); + BlobContainer blobContainer = mock(BlobContainer.class); + doNothing().when(blobContainer).writeBlob(anyString(), any(), anyLong(), anyBoolean()); + remoteDataDirectory = Mockito.spy(new RemoteDirectory(blobContainer)); remoteMetadataDirectory = mock(RemoteDirectory.class); mdLockManager = mock(RemoteStoreMetadataLockManager.class); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager, null); } public void testUploadedSegmentMetadataToString() { @@ -330,7 +342,7 @@ public void testOpenInput() throws IOException { remoteSegmentStoreDirectory.init(); IndexInput indexInput = mock(IndexInput.class); - when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenReturn(indexInput); + Mockito.doReturn(indexInput).when(remoteDataDirectory).openInput(startsWith("_0.si"), eq(IOContext.DEFAULT)); assertEquals(indexInput, remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); } @@ -343,7 +355,7 @@ public void testOpenInputException() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); - when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error")); + doThrow(new IOException("Error")).when(remoteDataDirectory).openInput(startsWith("_0.si"), eq(IOContext.DEFAULT)); assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); } @@ -496,6 +508,52 @@ public void testCopyFrom() throws IOException { storeDirectory.close(); } + public void testCopyFilesFrom() throws Exception { + BlobContainer dataContainer = mock(BlobContainer.class); + doNothing().when(dataContainer).writeBlob(anyString(), any(), anyLong(), anyBoolean()); + CryptoClient cryptoClient = mock(CryptoClient.class); + when(cryptoClient.initCryptoContext()).thenReturn(mock(Object.class)); + doAnswer((Answer) invocation -> (Stream) invocation.getArgument(1)).when(cryptoClient) + .createEncryptingStream(any(), any(Stream.class)); + + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + cryptoClient + ); + + List files = new ArrayList<>(); + remoteSegmentStoreDirectory.init(); + populateMetadata(); + Directory storeDirectory = LuceneTestCase.newDirectory(); + + String filename = "_100.si"; + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + files.add(filename); + + filename = "_200.si"; + indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World again!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + files.add(filename); + + storeDirectory.sync(files); + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, files, IOContext.DEFAULT); + + for (String fileName : files) { + assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(fileName)); + } + + verify(cryptoClient, times(files.size())).createEncryptingStream(any(), any(Stream.class)); + storeDirectory.close(); + } + public void testCopyFromException() throws IOException { String filename = "_100.si"; Directory storeDirectory = LuceneTestCase.newDirectory(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 24dae6f5be9ab..dad85da868252 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -169,7 +169,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin getPersistedSeqNoConsumer(), repository, threadPool, - primaryMode::get + primaryMode::get, + null ); } @@ -1263,7 +1264,8 @@ public int write(ByteBuffer src) throws IOException { persistedSeqNos::add, repository, threadPool, - () -> Boolean.TRUE + () -> Boolean.TRUE, + null ) { @Override ChannelFactory getChannelFactory() { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 196fbd58c2c20..ef5154bcd0755 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -8,12 +8,15 @@ package org.opensearch.index.translog.transfer; +import org.mockito.stubbing.Answer; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.Stream; +import org.opensearch.crypto.CryptoClient; import org.opensearch.env.Environment; import org.opensearch.env.TestEnvironment; import org.opensearch.indices.recovery.RecoverySettings; @@ -33,6 +36,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class BlobStoreTransferServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; @@ -74,6 +84,43 @@ public void testUploadBlobAsync() throws IOException, InterruptedException { transferService.uploadBlobAsync( ThreadPool.Names.TRANSLOG_TRANSFER, transferFileSnapshot, + null, + repository.basePath(), + new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + assert succeeded.compareAndSet(false, true); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshot.getPrimaryTerm()); + assertEquals(transferFileSnapshot.getName(), fileSnapshot.getName()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Failed to perform uploadBlobAsync", e); + } + }, latch), + TransferContentType.DATA + ); + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(succeeded.get()); + } + + public void testUploadBlobAsyncWithEncryptionEnabled() throws IOException, InterruptedException { + BlobStoreRepository repository = createRepository(true); + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + AtomicBoolean succeeded = new AtomicBoolean(false); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + CountDownLatch latch = new CountDownLatch(1); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + CryptoClient cryptoClient = mock(CryptoClient.class); + when(cryptoClient.initCryptoContext()).thenReturn(mock(Object.class)); + doAnswer((Answer) invocation -> (Stream) invocation.getArgument(1)).when(cryptoClient) + .createEncryptingStream(any(), any(Stream.class)); + transferService.uploadBlobAsync( + ThreadPool.Names.TRANSLOG_TRANSFER, + transferFileSnapshot, + cryptoClient, repository.basePath(), new LatchedActionListener<>(new ActionListener<>() { @Override @@ -87,10 +134,12 @@ public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { public void onFailure(Exception e) { throw new AssertionError("Failed to perform uploadBlobAsync", e); } - }, latch) + }, latch), + TransferContentType.DATA ); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); assertTrue(succeeded.get()); + verify(cryptoClient, times(1)).createEncryptingStream(any(), any(Stream.class)); } @Override @@ -100,10 +149,14 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } - /** Create a {@link Repository} with a random name **/ private BlobStoreRepository createRepository() { + return createRepository(null); + } + + /** Create a {@link Repository} with a random name **/ + private BlobStoreRepository createRepository(Boolean encrypted) { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); - RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings, encrypted); final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); final FsRepository repository = new FsRepository( repositoryMetadata, diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 1c485dbc35c63..65432027bd63a 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -10,11 +10,13 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.util.set.Sets; +import org.opensearch.crypto.CryptoClient; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; @@ -27,6 +29,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -35,11 +38,12 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import static org.mockito.Mockito.any; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -48,6 +52,7 @@ public class TranslogTransferManagerTests extends OpenSearchTestCase { private TransferService transferService; private ShardId shardId; + private CryptoClient cryptoClient; private BlobPath remoteBaseTransferPath; private ThreadPool threadPool; private long primaryTerm; @@ -63,6 +68,7 @@ public void setUp() throws Exception { minTranslogGeneration = randomLongBetween(0, generation); remoteBaseTransferPath = new BlobPath().add("base_path"); transferService = mock(TransferService.class); + cryptoClient = mock(CryptoClient.class); threadPool = new TestThreadPool(getClass().getName()); } @@ -82,11 +88,18 @@ public void testTransferSnapshot() throws IOException { doNothing().when(transferService) .uploadBlob(any(TransferFileSnapshot.class), Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm)))); doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[4]; listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[1]); return null; }).when(transferService) - .uploadBlobAsync(any(String.class), any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + .uploadBlobAsync( + any(String.class), + any(TransferFileSnapshot.class), + any(CryptoClient.class), + any(BlobPath.class), + any(ActionListener.class), + any(TransferContentType.class) + ); FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { @Override @@ -106,6 +119,7 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + cryptoClient, remoteBaseTransferPath, fileTransferTracker ); @@ -183,6 +197,7 @@ public void testReadMetadataNoFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + null, remoteBaseTransferPath, null ); @@ -195,6 +210,7 @@ public void testReadMetadataSingleFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + null, remoteBaseTransferPath, null ); @@ -214,6 +230,7 @@ public void testReadMetadataMultipleFiles() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + null, remoteBaseTransferPath, null ); @@ -232,6 +249,7 @@ public void testReadMetadataException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + null, remoteBaseTransferPath, null ); @@ -253,6 +271,7 @@ public void testDownloadTranslog() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + null, remoteBaseTransferPath, new FileTransferTracker(new ShardId("index", "indexUuid", 0)) ); @@ -272,6 +291,35 @@ public void testDownloadTranslog() throws IOException { assertTrue(Files.exists(location.resolve("translog-23.ckp"))); } + public void testDownloadAndDecryptTranslog() throws IOException { + Path location = createTempDir(); + CryptoClient cryptoClient = mock(CryptoClient.class); + doAnswer((Answer) invocation -> (InputStream) invocation.getArgument(0)).when(cryptoClient) + .createDecryptingStream(any(InputStream.class)); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + cryptoClient, + remoteBaseTransferPath, + new FileTransferTracker(new ShardId("index", "indexUuid", 0)) + ); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( + new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) + ); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.ckp"))).thenReturn( + new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) + ); + + assertFalse(Files.exists(location.resolve("translog-23.tlog"))); + assertFalse(Files.exists(location.resolve("translog-23.ckp"))); + translogTransferManager.downloadTranslog("12", "23", location); + assertTrue(Files.exists(location.resolve("translog-23.tlog"))); + assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + verify(cryptoClient, times(2)).createDecryptingStream(any()); + } + public void testDownloadTranslogAlreadyExists() throws IOException { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); Path location = createTempDir(); @@ -281,6 +329,7 @@ public void testDownloadTranslogAlreadyExists() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + null, remoteBaseTransferPath, tracker ); @@ -310,6 +359,7 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, + null, remoteBaseTransferPath, tracker ); @@ -346,6 +396,7 @@ public void testDeleteTranslogSuccess() throws Exception { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, blobStoreTransferService, + null, remoteBaseTransferPath, tracker ); @@ -370,6 +421,7 @@ public void testDeleteTranslogFailure() throws Exception { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, blobStoreTransferService, + null, remoteBaseTransferPath, tracker ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b3833655ab1ea..093bd8db87890 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -48,6 +48,7 @@ import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.IndexShardRoutingTable; @@ -74,6 +75,7 @@ import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.crypto.CryptoClient; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.Index; import org.opensearch.index.IndexSettings; @@ -150,6 +152,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -160,6 +163,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; @@ -568,16 +572,30 @@ protected IndexShard newShard( remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); } - final BiFunction translogFactorySupplier = (settings, shardRouting) -> { - if (settings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { - return new RemoteBlobStoreInternalTranslogFactory( - this::createRepositoriesService, - threadPool, - settings.getRemoteStoreTranslogRepository() - ); + final Supplier repositoryServiceRef = this::createRepositoriesService; + + IndicesService.TranslogFactorySupplier translogFactorySupplier = new IndicesService.TranslogFactorySupplier() { + @Override + public TranslogFactory createTranslogFactory(IndexSettings indexSettings, ShardRouting shardRouting) { + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + return new RemoteBlobStoreInternalTranslogFactory( + repositoryServiceRef, + threadPool, + indexSettings.getRemoteStoreTranslogRepository() + ); + } + return new InternalTranslogFactory(); + } + + @Override + public CryptoClient createCryptoClient(RepositoryMetadata repositoryMetadata) { + if (repositoryMetadata != null && Boolean.TRUE.equals(repositoryMetadata.encrypted())) { + return Objects.requireNonNull(repositoryServiceRef.get()).cryptoClient(repositoryMetadata); + } + return null; } - return new InternalTranslogFactory(); }; + indexShard = new IndexShard( routing, indexSettings, @@ -633,7 +651,12 @@ protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMet ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, null); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + dataDirectory, + metadataDirectory, + null, + null + ); return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); }