Skip to content

Commit

Permalink
crypto plugin integration changes
Browse files Browse the repository at this point in the history
Signed-off-by: Vikas Bansal <vikasvb@amazon.com>
  • Loading branch information
vikasvb90 committed May 1, 2023
1 parent 72ed13b commit 6531662
Show file tree
Hide file tree
Showing 23 changed files with 613 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public class RepositoryMetadata implements Writeable {
*/
private final long pendingGeneration;

/**
* Whether repository is encrypted
*/
private final Boolean encrypted;

/**
* Constructs new repository metadata
*
Expand All @@ -69,14 +74,22 @@ public class RepositoryMetadata implements Writeable {
* @param settings repository settings
*/
public RepositoryMetadata(String name, String type, Settings settings) {
this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN);
this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN, null);
}

public RepositoryMetadata(String name, String type, Settings settings, Boolean encrypted) {
this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN, encrypted);
}

public RepositoryMetadata(RepositoryMetadata metadata, long generation, long pendingGeneration) {
this(metadata.name, metadata.type, metadata.settings, generation, pendingGeneration);
this(metadata.name, metadata.type, metadata.settings, generation, pendingGeneration, null);
}

public RepositoryMetadata(String name, String type, Settings settings, long generation, long pendingGeneration) {
this(name, type, settings, generation, pendingGeneration, null);
}

public RepositoryMetadata(String name, String type, Settings settings, long generation, long pendingGeneration, Boolean encrypted) {
this.name = name;
this.type = type;
this.settings = settings;
Expand All @@ -87,6 +100,7 @@ public RepositoryMetadata(String name, String type, Settings settings, long gene
+ "] must be greater or equal to generation ["
+ generation
+ "]";
this.encrypted = encrypted;
}

/**
Expand Down Expand Up @@ -116,6 +130,15 @@ public Settings settings() {
return this.settings;
}

/**
* Returns whether repository is encrypted
*
* @return whether repository is encrypted
*/
public Boolean encrypted() {
return null;
}

/**
* Returns the safe repository generation. {@link RepositoryData} for this generation is assumed to exist in the repository.
* All operations on the repository must be based on the {@link RepositoryData} at this generation.
Expand Down Expand Up @@ -146,6 +169,7 @@ public RepositoryMetadata(StreamInput in) throws IOException {
settings = Settings.readSettingsFromStream(in);
generation = in.readLong();
pendingGeneration = in.readLong();
encrypted = null;
}

/**
Expand Down
55 changes: 55 additions & 0 deletions server/src/main/java/org/opensearch/common/Stream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import java.io.InputStream;

/**
* Model composed of an input stream, the total content length and offset
*/
public class Stream {

private final InputStream inputStream;
private final long contentLength;
private final long offset;

/**
* Construct a new stream object
*
* @param inputStream The input stream that is to be encapsulated
* @param contentLength The total content length that is to be read from the stream
* @param offset The offset pointer that this stream reads from in the file
*/
public Stream(InputStream inputStream, long contentLength, long offset) {
this.inputStream = inputStream;
this.contentLength = contentLength;
this.offset = offset;
}

/**
* @return The input stream this object is reading from
*/
public InputStream getInputStream() {
return inputStream;
}

/**
* @return The total length of the content that has to be read from this stream
*/
public long getContentLength() {
return contentLength;
}

/**
* @return The offset pointer in the file that this stream is reading from
*/
public long getOffset() {
return offset;
}
}
92 changes: 92 additions & 0 deletions server/src/main/java/org/opensearch/crypto/CryptoClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.crypto;

import org.opensearch.common.Stream;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.RefCounted;

import java.io.InputStream;

/**
* Crypto plugin interface used for encryption and decryption.
*/
public interface CryptoClient extends RefCounted {

/**
* A factory interface for constructing crypto client.
*
*/
interface Factory {

/**
* Constructs a crypto client used for encryption and decryption
*
* @param cryptoSettings Settings needed for creating crypto client.
* @param keyProviderName Name of the key provider.
* @return instance of CryptoClient
*/
CryptoClient create(Settings cryptoSettings, String keyProviderName);
}

/**
* @return key provider type
*/
String type();

/**
* @return key provider name
*/
String name();

/**
* To Initialise a crypto context used in encryption. This might be needed to set the context before beginning
* encryption.
*
* @return crypto context instance
*/
Object initCryptoContext();

/**
* In scenarios where content is divided into multiple parts and streams are emitted against each part,
* it is sometimes required to adjust the size of a part.
*
* @param cryptoContextObj crypto context instance
* @param streamSize Size of the raw stream
* @return Adjusted size of the stream.
*/
long adjustStreamSize(Object cryptoContextObj, long streamSize);

/**
* Wraps a raw InputStream with encrypting stream
*
* @param cryptoContext created earlier to set the crypto context.
* @param stream Raw InputStream to encrypt
* @return encrypting stream wrapped around raw InputStream.
*/
Stream createEncryptingStream(Object cryptoContext, Stream stream);

/**
* Provides encrypted stream for a raw stream emitted for a part of content.
*
* @param cryptoContextObj crypto context instance.
* @param stream raw stream for which encrypted stream has to be created.
* @param totalStreams Number of streams being used for the entire content.
* @param streamIdx Index of the current stream.
* @return Encrypted stream for the provided raw stream.
*/
Stream createEncryptingStreamOfPart(Object cryptoContextObj, Stream stream, int totalStreams, int streamIdx);

/**
* This method accepts an encrypted stream and provides a decrypting wrapper.
* @param encryptingStream to be decrypted.
* @return Decrypting wrapper stream
*/
InputStream createDecryptingStream(InputStream encryptingStream);
}
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/crypto/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package for crypto client abstractions and exceptions.
*/
package org.opensearch.crypto;
35 changes: 27 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.crypto.CryptoClient;
import org.opensearch.gateway.WriteStateException;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -2514,10 +2515,10 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(Repository repository, ActionListener<Boolean> listener) {
public void restoreFromRemoteStore(Repository repository, CryptoClient cryptoClient, ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRemoteStore(this, repository, listener);
storeRecovery.recoverFromRemoteStore(this, repository, cryptoClient, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -3319,13 +3320,25 @@ public void startRecovery(
break;
case REMOTE_STORE:
final Repository remoteTranslogRepo;
final CryptoClient cryptoClient;
final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository();
if (remoteTranslogRepoName != null) {
remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName);
if (Boolean.TRUE.equals(remoteTranslogRepo.getMetadata().encrypted())) {
cryptoClient = repositoriesService.cryptoClient(remoteTranslogRepo.getMetadata());
} else {
cryptoClient = null;
}
} else {
remoteTranslogRepo = null;
cryptoClient = null;
}
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l));
executeRecovery(
"from remote store",
recoveryState,
recoveryListener,
l -> restoreFromRemoteStore(remoteTranslogRepo, cryptoClient, l)
);
break;
case PEER:
try {
Expand Down Expand Up @@ -4456,12 +4469,14 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
final Directory remoteStoreDelegate = byteSizeCachingStoreDirectory.getDelegate();
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
((RemoteSegmentStoreDirectory) remoteDirectory).init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory)
assert remoteStoreDelegate instanceof RemoteSegmentStoreDirectory
: "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteStoreDelegate;
remoteDirectory.init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteDirectory
.getSegmentsUploadedToRemoteStore();
store.incRef();
remoteStore.incRef();
Expand Down Expand Up @@ -4490,7 +4505,11 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
if (localSegmentFiles.contains(file)) {
storeDirectory.deleteFile(file);
}
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) {
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
} else {
remoteDirectory.downloadDataFile(storeDirectory, file, IOContext.DEFAULT);
}
downloadedSegments.add(file);
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) {
assert segmentInfosSnapshotFilename == null : "There should be only one SegmentInfosSnapshot file";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s
// Visible for testing
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> {
Collection<String> filteredFiles = localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> {
try {
return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file));
} catch (IOException e) {
Expand All @@ -209,15 +209,15 @@ boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
);
return true;
}
}).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
} catch (IOException e) {
uploadSuccess.set(false);
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});
}).collect(Collectors.toList());

try {
remoteDirectory.copyFilesFrom(storeDirectory, filteredFiles, IOContext.DEFAULT);
} catch (Exception e) {
uploadSuccess.set(false);
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", e), e);
}
return uploadSuccess.get();
}

Expand Down
Loading

0 comments on commit 6531662

Please sign in to comment.