Skip to content

Commit

Permalink
[awslabs#367] Enhanced multi-lang AWSCredentialsProvider=... decode…
Browse files Browse the repository at this point in the history
…r and construction.

+ added support for external ids (issue awslabs#367)
+ added support for endpoint+region (e.g., STS via VPC)
  • Loading branch information
stair-aws committed Aug 3, 2023
1 parent eccd6cf commit 0bea768
Show file tree
Hide file tree
Showing 11 changed files with 495 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public class MultiLangDaemonConfig {
private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";

private MultiLangDaemonConfiguration multiLangDaemonConfiguration;
private final MultiLangDaemonConfiguration multiLangDaemonConfiguration;

private ExecutorService executorService;
private final ExecutorService executorService;

private MultiLangRecordProcessorFactory recordProcessorFactory;
private final MultiLangRecordProcessorFactory recordProcessorFactory;

/**
* Constructor.
Expand Down Expand Up @@ -165,7 +165,6 @@ private static Properties loadProperties(ClassLoader classLoader, String propert
propertyStream.close();
}
}

}

private static boolean validateProperties(Properties properties) {
Expand All @@ -182,12 +181,12 @@ private static ExecutorService buildExecutorService(Properties properties) {
log.debug("Value for {} property is {}", PROP_MAX_ACTIVE_THREADS, maxActiveThreads);
if (maxActiveThreads <= 0) {
log.info("Using a cached thread pool.");
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
builder.build());
} else {
log.info("Using a fixed thread pool with {} max active threads.", maxActiveThreads);
return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), builder.build());
new LinkedBlockingQueue<>(), builder.build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.processor.ShardRecordProcessor;


/**
* A record processor that manages creating a child process that implements the multi language protocol and connecting
* that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on
Expand All @@ -50,20 +49,20 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {

private Future<?> stderrReadTask;

private MessageWriter messageWriter;
private MessageReader messageReader;
private DrainChildSTDERRTask readSTDERRTask;
private final MessageWriter messageWriter;
private final MessageReader messageReader;
private final DrainChildSTDERRTask readSTDERRTask;

private ProcessBuilder processBuilder;
private final ProcessBuilder processBuilder;
private Process process;
private ExecutorService executorService;
private final ExecutorService executorService;
private ProcessState state;

private ObjectMapper objectMapper;
private final ObjectMapper objectMapper;

private MultiLangProtocol protocol;

private MultiLangDaemonConfiguration configuration;
private final MultiLangDaemonConfiguration configuration;

@Override
public void initialize(InitializationInput initializationInput) {
Expand Down Expand Up @@ -213,7 +212,6 @@ private enum ProcessState {
this.readSTDERRTask = readSTDERRTask;
this.configuration = configuration;


this.state = ProcessState.ACTIVE;
}

Expand Down Expand Up @@ -303,8 +301,6 @@ private void stopProcessing(String message, Throwable reason) {

/**
* We provide a package level method for unit testing this call to exit.
*
* @param val exit value
*/
void exit() {
System.exit(EXIT_VALUE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.multilang;

import java.util.HashMap;
import java.util.Map;

import com.google.common.base.CaseFormat;

import lombok.AccessLevel;
import lombok.Getter;

/**
* Key-Value pairs which may be nested in, and extracted from, a property value
* in a Java properties file. For example, given the line in a property file of
* {@code my_key = my_value|foo=bar} and a delimiter split on {@code |} (pipe),
* the value {@code my_value|foo=bar} would have a nested key of {@code foo}
* and its corresponding value is {@code bar}.
* <br/><br/>
* The order of nested properties does not matter, and these properties are optional.
* Customers may choose to provide, in any order, zero-or-more nested properties.
* <br/><br/>
* Duplicate keys are not supported, and may result in a last-write-wins outcome.
*/
public enum NestedPropertyKeys {

/**
* Specify the service endpoint where requests will be submitted.
* This property's value must be in the following format:
* <pre>
* ENDPOINT ::= SERVICE_ENDPOINT "^" SIGNING_REGION
* SERVICE_ENDPOINT ::= URL
* SIGNING_REGION ::= AWS_REGION
* </pre>
*
* @see <a href="https://docs.aws.amazon.com/general/latest/gr/rande.html">AWS Service endpoints</a>
* @see <a href="https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions">Available Regions</a>
*/
ENDPOINT {
void visit(final NestedPropertyProcessor processor, final String endpoint) {
final String[] tokens = endpoint.split("\\^");
if (tokens.length != 2) {
throw new IllegalArgumentException("Invalid " + name() + ": " + endpoint);
}
processor.acceptEndpoint(tokens[0], tokens[1]);
}
},

/**
* External ids may be used when delegating access in a multi-tenant
* environment, or to third parties.
*
* @see <a href="https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html">
* How to use an external ID when granting access to your AWS resources to a third party</a>
*/
EXTERNAL_ID {
void visit(final NestedPropertyProcessor processor, final String externalId) {
processor.acceptExternalId(externalId);
}
},

;

/**
* Nested key within the property value. For example, a nested key-value
* of {@code foo=bar} has a nested key of {@code foo}.
*/
@Getter(AccessLevel.PACKAGE)
private final String nestedKey;

NestedPropertyKeys() {
// convert the enum from UPPER_SNAKE_CASE to lowerCamelCase
nestedKey = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name());
}

abstract void visit(NestedPropertyProcessor processor, String value);

/**
* Parses any number of parameters. Each nested property will prompt a
* visit to the {@code processor}.
*
* @param processor processor to be invoked for every nested property
* @param params parameters to check for a nested property key
*/
public static void parse(final NestedPropertyProcessor processor, final String... params) {
// Construct a disposable cache to keep this O(n). Since parsing is
// usually one-and-done, it's wasteful to maintain this cache in perpetuity.
final Map<String, NestedPropertyKeys> cachedKeys = new HashMap<>();
for (final NestedPropertyKeys npk : values()) {
cachedKeys.put(npk.getNestedKey(), npk);
}

for (final String param : params) {
if (param != null) {
final String[] tokens = param.split("=");
if (tokens.length == 2) {
final NestedPropertyKeys npk = cachedKeys.get(tokens[0]);
if (npk != null) {
npk.visit(processor, tokens[1]);
}
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.multilang;

/**
* Defines methods to process {@link NestedPropertyKeys}.
*/
public interface NestedPropertyProcessor {

/**
* Set the external id, an optional field to designate who can assume an IAM role.
*
* @param externalId external id used in the service call used to retrieve session credentials
*/
void acceptExternalId(String externalId);

/**
* Set the service endpoint where requests are sent.
*
* @param serviceEndpoint the service endpoint either with or without the protocol
* (e.g. https://sns.us-west-1.amazonaws.com or sns.us-west-1.amazonaws.com)
* @param signingRegion the region to use for SigV4 signing of requests (e.g. us-west-1)
*
* @see <a href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/client/builder/AwsClientBuilder.EndpointConfiguration.html">
* AwsClientBuilder.EndpointConfiguration</a>
*/
void acceptEndpoint(String serviceEndpoint, String signingRegion);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.multilang.auth;

import java.util.Arrays;

import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.AWSSessionCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.Builder;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;

import software.amazon.kinesis.multilang.NestedPropertyKeys;
import software.amazon.kinesis.multilang.NestedPropertyProcessor;

/**
* An {@link AWSSessionCredentialsProvider} that is backed by STSAssumeRole.
*/
public class KclSTSAssumeRoleSessionCredentialsProvider
implements AWSSessionCredentialsProvider, NestedPropertyProcessor {

private final Builder builder;

private final STSAssumeRoleSessionCredentialsProvider provider;

/**
*
* @param params vararg parameters which must include roleArn at index=0,
* and roleSessionName at index=1
*/
public KclSTSAssumeRoleSessionCredentialsProvider(final String[] params) {
this(params[0], params[1], Arrays.copyOfRange(params, 2, params.length));
}

public KclSTSAssumeRoleSessionCredentialsProvider(final String roleArn, final String roleSessionName,
final String... params) {
builder = new Builder(roleArn, roleSessionName);
NestedPropertyKeys.parse(this, params);
provider = builder.build();
}

@Override
public AWSSessionCredentials getCredentials() {
return provider.getCredentials();
}

@Override
public void refresh() {
// do nothing
}

@Override
public void acceptExternalId(final String externalId) {
builder.withExternalId(externalId);
}

@Override
public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) {
final AwsClientBuilder.EndpointConfiguration
endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, signingRegion);
final AWSSecurityTokenService stsClient =
AWSSecurityTokenServiceClient.builder()
.withEndpointConfiguration(endpointConfiguration)
.withRegion(Regions.fromName(signingRegion))
.build();
builder.withStsClient(stsClient);
}

}
Loading

0 comments on commit 0bea768

Please sign in to comment.