Skip to content

Commit

Permalink
Multiple multi-lang edits to introduce logging and additional tests.
Browse files Browse the repository at this point in the history
+ added `ENDPOINT_REGION` nested key for a simpler Cx experience
+ deduplicated, and improved, logic w.r.t. CredentialsProvider
construction to NOT swallow Exceptions
  • Loading branch information
stair-aws committed Aug 4, 2023
1 parent 1295088 commit 6809f29
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.Map;

import com.amazonaws.regions.Regions;
import com.google.common.base.CaseFormat;

import lombok.AccessLevel;
Expand Down Expand Up @@ -47,6 +48,9 @@ public enum NestedPropertyKey {
* SIGNING_REGION ::= AWS_REGION
* </pre>
*
* It would be redundant to provide both this and {@link #ENDPOINT_REGION}.
*
* @see #ENDPOINT_REGION
* @see <a href="https://docs.aws.amazon.com/general/latest/gr/rande.html">AWS Service endpoints</a>
* @see <a href="https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions">Available Regions</a>
*/
Expand All @@ -60,6 +64,21 @@ void visit(final NestedPropertyProcessor processor, final String endpoint) {
}
},

/**
* Specify the region where service requests will be submitted. This
* region will determine both the service endpoint and signing region.
* <br/><br/>
* It would be redundant to provide both this and {@link #ENDPOINT}.
*
* @see #ENDPOINT
* @see <a href="https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions">Available Regions</a>
*/
ENDPOINT_REGION {
void visit(final NestedPropertyProcessor processor, final String region) {
processor.acceptEndpointRegion(Regions.fromName(region));
}
},

/**
* External ids may be used when delegating access in a multi-tenant
* environment, or to third parties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,41 @@
*/
package software.amazon.kinesis.multilang;

import com.amazonaws.regions.Regions;

/**
* Defines methods to process {@link NestedPropertyKey}s.
*/
public interface NestedPropertyProcessor {

/**
* Set the external id, an optional field to designate who can assume an IAM role.
*
* @param externalId external id used in the service call used to retrieve session credentials
*/
void acceptExternalId(String externalId);

/**
* Set the service endpoint where requests are sent.
*
* @param serviceEndpoint the service endpoint either with or without the protocol
* (e.g. https://sns.us-west-1.amazonaws.com or sns.us-west-1.amazonaws.com)
* (e.g., https://sns.us-west-1.amazonaws.com, sns.us-west-1.amazonaws.com)
* @param signingRegion the region to use for SigV4 signing of requests (e.g. us-west-1)
*
* @see #acceptEndpointRegion(Regions)
* @see <a href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/client/builder/AwsClientBuilder.EndpointConfiguration.html">
* AwsClientBuilder.EndpointConfiguration</a>
*/
void acceptEndpoint(String serviceEndpoint, String signingRegion);

/**
* Set the service endpoint where requests are sent.
*
* @param region Region to be used by the client. This will be used to determine both the service endpoint
* (e.g., https://sns.us-west-1.amazonaws.com) and signing region (e.g., us-west-1) for requests.
*
* @see #acceptEndpoint(String, String)
*/
void acceptEndpointRegion(Regions region);

/**
* Set the external id, an optional field to designate who can assume an IAM role.
*
* @param externalId external id used in the service call used to retrieve session credentials
*/
void acceptExternalId(String externalId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,25 @@ public void refresh() {
}

@Override
public void acceptExternalId(final String externalId) {
builder.withExternalId(externalId);
public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) {
final EndpointConfiguration endpoint = new EndpointConfiguration(serviceEndpoint, signingRegion);
final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder()
.withEndpointConfiguration(endpoint)
.build();
builder.withStsClient(stsClient);
}

@Override
public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) {
final EndpointConfiguration endpoint = new EndpointConfiguration(serviceEndpoint, signingRegion);
final AWSSecurityTokenService stsClient =
AWSSecurityTokenServiceClient.builder()
.withEndpointConfiguration(endpoint)
.withRegion(Regions.fromName(signingRegion))
.build();
public void acceptEndpointRegion(final Regions region) {
final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder()
.withRegion(region)
.build();
builder.withStsClient(stsClient);
}

@Override
public void acceptExternalId(final String externalId) {
builder.withExternalId(externalId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package software.amazon.kinesis.multilang.config;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -84,41 +84,35 @@ private static List<AWSCredentialsProvider> getValidCredentialsProviders(List<St
}
clazz = (Class<? extends AWSCredentialsProvider>) c;
} catch (ClassNotFoundException cnfe) {
// Providers are a product of prefixed Strings to cover multiple
// namespaces (e.g., "Foo" -> { "some.auth.Foo", "kcl.auth.Foo" }).
// It's expected that many class names will not resolve.
continue;
}
log.info("Attempting to construct {}", clazz);

AWSCredentialsProvider provider = null;
if (nameAndArgs.length > 1) {
final String[] varargs = Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length);

// attempt to invoke an explicit N-arg constructor of FooClass(String, String, ...)
try {
provider = constructProvider(providerName, () -> {
Class<?>[] argTypes = new Class<?>[nameAndArgs.length - 1];
Arrays.fill(argTypes, String.class);
Constructor<? extends AWSCredentialsProvider> c = clazz.getConstructor(argTypes);
provider = c.newInstance(varargs);
} catch (Exception e) {
log.debug("Can't find any credentials provider matching {}.", providerName);
}
return clazz.getConstructor(argTypes).newInstance(varargs);
});

if (provider == null) {
// attempt to invoke a public varargs/array constructor of FooClass(String[])
try {
Constructor<? extends AWSCredentialsProvider> c = clazz.getConstructor(String[].class);
provider = c.newInstance((Object) varargs);
} catch (Exception e) {
log.debug("Can't find any credentials provider matching {}.", providerName);
}
provider = constructProvider(providerName, () ->
clazz.getConstructor(String[].class).newInstance((Object) varargs)
);
}
}

if (provider == null) {
// regardless of parameters, fallback to invoke a public no-arg constructor
try {
provider = clazz.newInstance();
} catch (Exception e) {
log.debug("Can't find any credentials provider matching {}.", providerName);
}
provider = constructProvider(providerName, clazz::newInstance);
}

if (provider != null) {
Expand Down Expand Up @@ -158,4 +152,32 @@ private static List<String> getPossibleFullClassNames(final String provider) {
).map(prefix -> prefix + provider).collect(Collectors.toList());
}

@FunctionalInterface
private interface CredentialsProviderConstructor<T extends AWSCredentialsProvider> {
T construct() throws IllegalAccessException, InstantiationException,
InvocationTargetException, NoSuchMethodException;
}

/**
* Attempts to construct an {@link AWSCredentialsProvider}.
*
* @param providerName Raw, unmodified provider name. Should there be an
* Exeception during construction, this parameter will be logged.
* @param constructor supplier-like function that will perform the construction
* @return the constructed provider, if successful; otherwise, null
*
* @param <T> type of the CredentialsProvider to construct
*/
private static <T extends AWSCredentialsProvider> T constructProvider(
final String providerName, final CredentialsProviderConstructor<T> constructor) {
try {
return constructor.construct();
} catch (NoSuchMethodException ignored) {
// ignore
} catch (IllegalAccessException | InstantiationException | InvocationTargetException | RuntimeException e) {
log.warn("Failed to construct {}", providerName, e);
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public KinesisClientLibConfigurator() {
public MultiLangDaemonConfiguration getConfiguration(Properties properties) {
properties.entrySet().forEach(e -> {
try {
log.info("Processing (key={}, value={})", e.getKey(), e.getValue());
utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue());
} catch (IllegalAccessException | InvocationTargetException ex) {
throw new RuntimeException(ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The Stream arn: arn:aws:kinesis:<region>:<account id>:stream/<stream name>
# Important: streamArn takes precedence over streamName if both are set
streamArn = arn:aws:kinesis:us-east-5:000000000000:stream/kclpysample

# The name of an Amazon Kinesis stream to process.
# Important: streamArn takes precedence over streamName if both are set
streamName = kclpysample

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = MultiLangTest

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/3.8

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# To specify an initial timestamp from which to start processing records, please specify timestamp value for 'initiatPositionInStreamExtended',
# and uncomment below line with right timestamp value.
# See more from 'Timestamp' under http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
#initialPositionInStreamExtended = 1636609142

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId =

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

@RunWith(MockitoJUnitRunner.class)
public class MultiLangDaemonConfigTest {
private static final String FILENAME = "some.properties";
private static final String FILENAME = "multilang.properties";
private static final String EXE = "TestExe.exe";
private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName();
private static final String STREAM_NAME = "fakeStream";
Expand All @@ -52,7 +52,7 @@ public class MultiLangDaemonConfigTest {
@Mock
private AwsCredentials creds;

private KinesisClientLibConfigurator configurator;
private final KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
private MultiLangDaemonConfig deamonConfig;

/**
Expand Down Expand Up @@ -84,8 +84,6 @@ public void setup(String streamName, String streamArn) throws IOException {

when(credentialsProvider.resolveCredentials()).thenReturn(creds);
when(creds.accessKeyId()).thenReturn("cool-user");
configurator = new KinesisClientLibConfigurator();

deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
}

Expand Down Expand Up @@ -200,4 +198,9 @@ public void testPropertyValidation() {
}
}

@Test
public void testActualPropertiesFile() throws Exception {
new MultiLangDaemonConfig(FILENAME);
}

}
Loading

0 comments on commit 6809f29

Please sign in to comment.