Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(msk-alpha): MSK Kafka versions 2.8.2.tiered and 3.5.1 and StorageMode property #27560

Merged
merged 36 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
efa6f01
setup the beginnings of tiered storage and storageMode
chrispidcock Oct 13, 2023
5bae714
added tests and kafka version 3.5.1
chrispidcock Oct 15, 2023
c5c0f58
added tests and kafka version 3.5.1
chrispidcock Oct 15, 2023
9525641
updated isTiered logic to use a string or class, to make it more adap…
chrispidcock Oct 15, 2023
6f0fae6
updated isTiered logic to use a string or class, to make it more adap…
chrispidcock Oct 15, 2023
8d9fc1e
remove accident change
chrispidcock Oct 15, 2023
4ecf14d
undefined storagemode by default, only because it reduces the snapsho…
chrispidcock Oct 15, 2023
d7e9bd0
changes to help me understand how to make everything pass
chrispidcock Oct 15, 2023
f6a2d30
cleaning up
chrispidcock Oct 16, 2023
64a971c
update tests, but unable to test cdk.depoy test yet, or update snapshots
chrispidcock Oct 16, 2023
e30c473
add StorageMode as a default setting
chrispidcock Oct 16, 2023
ffbad8c
tests: integration tests for storage mode
chrispidcock Oct 17, 2023
6e7972b
setting default applied value for storagemode to undefined
chrispidcock Oct 17, 2023
b2af07b
tests: updated zookeeper integration test kafka version
chrispidcock Oct 17, 2023
2428789
typo in storagemode integ
chrispidcock Oct 17, 2023
d7b7ea4
update add-cluster-user.js integration test
chrispidcock Oct 17, 2023
4a33b88
docs: add Storage Mode to the msk README
chrispidcock Oct 18, 2023
d984aea
complete integration tests
chrispidcock Oct 20, 2023
a6f2764
fix no-multiple-empty-lines
chrispidcock Oct 20, 2023
2562421
Merge branch 'main' into adding-new-msk-kafka-versions
chrispidcock Oct 20, 2023
7021063
Apply suggestions from code review
chrispidcock Oct 22, 2023
c04e415
updated code based on PR comments
chrispidcock Oct 22, 2023
bb173a0
comment update in prop
chrispidcock Oct 22, 2023
d666eab
Apply suggestions from code review
chrispidcock Oct 22, 2023
f167ba3
update tests to align with suggested changes
chrispidcock Oct 23, 2023
732ac38
fix: Strings must use singlequote quotes
chrispidcock Oct 23, 2023
4d7e28b
undo default instance size change
chrispidcock Oct 23, 2023
78c6b5b
fix: Strings must use singlequote quotes
chrispidcock Oct 23, 2023
2c4a0b5
Merge branch 'main' into adding-new-msk-kafka-versions
chrispidcock Oct 23, 2023
337ebf8
fix instanceType condition and tests
chrispidcock Oct 23, 2023
8aa7ba1
instanceType looks best place for mskInstanceType
chrispidcock Oct 23, 2023
2050f4e
Apply suggestions from code review
kaizencc Nov 17, 2023
e94f72c
Update packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts
chrispidcock Nov 19, 2023
045280b
Merge branch 'main' into adding-new-msk-kafka-versions
chrispidcock Nov 29, 2023
01199ab
new lines to fix styling
kaizencc Dec 1, 2023
df26766
Merge branch 'main' into adding-new-msk-kafka-versions
kaizencc Dec 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions packages/@aws-cdk/aws-msk-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ new CfnOutput(this, 'ZookeeperConnectionTls', { value: cluster.zookeeperConnecti
To import an existing MSK cluster into your CDK app use the `.fromClusterArn()` method.

```ts
const cluster = msk.Cluster.fromClusterArn(this, 'Cluster',
const cluster = msk.Cluster.fromClusterArn(this, 'Cluster',
'arn:aws:kafka:us-west-2:1234567890:cluster/a-cluster/11111111-1111-1111-1111-111111111111-1',
);
```
Expand Down Expand Up @@ -146,7 +146,7 @@ const cluster = new msk.Cluster(this, 'cluster', {

### SASL/IAM + TLS

Enable client authentication with [IAM](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)
Enable client authentication with [IAM](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)
as well as enable client authentication with TLS by setting the `certificateAuthorityArns` property to reference your ACM Private CA. [More info on Private CAs.](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)

```ts
Expand Down Expand Up @@ -210,3 +210,23 @@ in the `cdk.json` file.
}
```

## Storage Mode

You can configure an MSK cluster storage mode using the `storageMode` property.

Tiered storage is a low-cost storage tier for Amazon MSK that scales to virtually unlimited storage,
making it cost-effective to build streaming data applications.

> Visit [Tiered storage](https://docs.aws.amazon.com/msk/latest/developerguide/msk-tiered-storage.html) for more details.

```ts
declare const vpc: ec2.Vpc;
declare const bucket: s3.IBucket;

const cluster = new msk.Cluster(this, 'cluster', {
chrispidcock marked this conversation as resolved.
Show resolved Hide resolved
clusterName: 'myCluster',
kafkaVersion: msk.KafkaVersion.V2_8_2_TIERED,
vpc,
storageMode: msk.StorageMode.TIERED,
});
```
17 changes: 17 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/cluster-version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ export class KafkaVersion {
*/
public static readonly V2_8_1 = KafkaVersion.of('2.8.1');

/**
* AWS MSK Kafka version 2.8.2.tiered
*/
public static readonly V2_8_2_TIERED = KafkaVersion.of('2.8.2.tiered');

/**
* Kafka version 3.1.1
*/
Expand All @@ -101,6 +106,11 @@ export class KafkaVersion {
*/
public static readonly V3_4_0 = KafkaVersion.of('3.4.0');

/**
* Kafka version 3.5.1
*/
public static readonly V3_5_1 = KafkaVersion.of('3.5.1');

/**
* Custom cluster version
* @param version custom version number
Expand All @@ -114,4 +124,11 @@ export class KafkaVersion {
* @param version cluster version number
*/
private constructor(public readonly version: string) {}

/**
* Checks if the cluster version supports tiered storage mode.
*/
public isTieredStorageCompatible() {
return this.version.endsWith('.tiered');
};
}
63 changes: 63 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,25 @@ export interface ClusterProps {
* The physical name of the cluster.
*/
readonly clusterName: string;

/**
* The version of Apache Kafka.
*/
readonly kafkaVersion: KafkaVersion;

/**
* Number of Apache Kafka brokers deployed in each Availability Zone.
*
* @default 1
*/
readonly numberOfBrokerNodes?: number;

/**
* Defines the virtual networking environment for this cluster.
* Must have at least 2 subnets in two different AZs.
*/
readonly vpc: ec2.IVpc;

/**
* Where to place the nodes within the VPC.
* Amazon MSK distributes the broker nodes evenly across the subnets that you specify.
Expand All @@ -83,57 +87,74 @@ export interface ClusterProps {
* @default - the Vpc default strategy if not specified.
*/
readonly vpcSubnets?: ec2.SubnetSelection;

/**
* The EC2 instance type that you want Amazon MSK to use when it creates your brokers.
*
* @see https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-cluster.html#broker-instance-types
* @default kafka.m5.large
*/
readonly instanceType?: ec2.InstanceType;

/**
* The AWS security groups to associate with the elastic network interfaces in order to specify who can
* connect to and communicate with the Amazon MSK cluster.
*
* @default - create new security group
*/
readonly securityGroups?: ec2.ISecurityGroup[];

/**
* Information about storage volumes attached to MSK broker nodes.
*
* @default - 1000 GiB EBS volume
*/
readonly ebsStorageInfo?: EbsStorageInfo;

/**
* This controls storage mode for supported storage tiers.
*
* @default - StorageMode.LOCAL
* @see https://docs.aws.amazon.com/msk/latest/developerguide/msk-tiered-storage.html
*/
readonly storageMode?: StorageMode;

/**
* The Amazon MSK configuration to use for the cluster.
*
* @default - none
*/
readonly configurationInfo?: ClusterConfigurationInfo;

/**
* Cluster monitoring configuration.
*
* @default - DEFAULT monitoring level
*/
readonly monitoring?: MonitoringConfiguration;

/**
* Configure your MSK cluster to send broker logs to different destination types.
*
* @default - disabled
*/
readonly logging?: BrokerLogging;

/**
* Config details for encryption in transit.
*
* @default - enabled
*/
readonly encryptionInTransit?: EncryptionInTransitConfig;

/**
* Configuration properties for client authentication.
* MSK supports using private TLS certificates or SASL/SCRAM to authenticate the identity of clients.
*
* @default - disabled
*/
readonly clientAuthentication?: ClientAuthentication;

/**
* What to do when this resource is deleted from a stack.
*
Expand All @@ -152,6 +173,7 @@ export interface EbsStorageInfo {
* @default 1000
*/
readonly volumeSize?: number;

/**
* The AWS KMS key for encrypting data at rest.
*
Expand All @@ -160,6 +182,21 @@ export interface EbsStorageInfo {
readonly encryptionKey?: kms.IKey;
}

/**
* The storage mode for the cluster brokers.
*/
export enum StorageMode {
/**
* Local storage mode utilizes network attached EBS storage.
*/
LOCAL = 'LOCAL',

/**
* Tiered storage mode utilizes EBS storage and Tiered storage.
*/
TIERED = 'TIERED',
}

/**
* The Amazon MSK configuration to use for the cluster.
* Note: There is currently no Cloudformation Resource to create a Configuration
Expand All @@ -170,6 +207,7 @@ export interface ClusterConfigurationInfo {
* For example, arn:aws:kafka:us-east-1:123456789012:configuration/example-configuration-name/abcdabcd-1234-abcd-1234-abcd123e8e8e-1.
*/
readonly arn: string;

/**
* The revision of the Amazon MSK configuration to use.
*/
Expand All @@ -186,14 +224,17 @@ export enum ClusterMonitoringLevel {
* Default metrics are the essential metrics to monitor.
*/
DEFAULT = 'DEFAULT',

/**
* Per Broker metrics give you metrics at the broker level.
*/
PER_BROKER = 'PER_BROKER',

/**
* Per Topic Per Broker metrics help you understand volume at the topic level.
*/
PER_TOPIC_PER_BROKER = 'PER_TOPIC_PER_BROKER',

/**
* Per Topic Per Partition metrics help you understand consumer group lag at the topic partition level.
*/
Expand All @@ -210,12 +251,14 @@ export interface MonitoringConfiguration {
* @default DEFAULT
*/
readonly clusterMonitoringLevel?: ClusterMonitoringLevel;

/**
* Indicates whether you want to enable or disable the JMX Exporter.
*
* @default false
*/
readonly enablePrometheusJmxExporter?: boolean;

/**
* Indicates whether you want to enable or disable the Prometheus Node Exporter.
*
Expand All @@ -236,12 +279,14 @@ export interface BrokerLogging {
* @default - disabled
*/
readonly firehoseDeliveryStreamName?: string;

/**
* The CloudWatch Logs group that is the destination for broker logs.
*
* @default - disabled
*/
readonly cloudwatchLogGroup?: logs.ILogGroup;

/**
* Details of the Amazon S3 destination for broker logs.
*
Expand All @@ -258,6 +303,7 @@ export interface S3LoggingConfiguration {
* The S3 bucket that is the destination for broker logs.
*/
readonly bucket: s3.IBucket;

/**
* The S3 prefix that is the destination for broker logs.
*
Expand All @@ -274,10 +320,12 @@ export enum ClientBrokerEncryption {
* TLS means that client-broker communication is enabled with TLS only.
*/
TLS = 'TLS',

/**
* TLS_PLAINTEXT means that client-broker communication is enabled for both TLS-encrypted, as well as plaintext data.
*/
TLS_PLAINTEXT = 'TLS_PLAINTEXT',

/**
* PLAINTEXT means that client-broker communication is enabled in plaintext only.
*/
Expand All @@ -296,6 +344,7 @@ export interface EncryptionInTransitConfig {
* @default - TLS
*/
readonly clientBroker?: ClientBrokerEncryption;

/**
* Indicates that data communication among the broker nodes of the cluster is encrypted.
*
Expand All @@ -314,12 +363,14 @@ export interface SaslAuthProps {
* @default false
*/
readonly scram?: boolean;

/**
* Enable IAM access control.
*
* @default false
*/
readonly iam?: boolean;

/**
* KMS Key to encrypt SASL/SCRAM secrets.
*
Expand Down Expand Up @@ -486,6 +537,17 @@ export class Cluster extends ClusterBase {
ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE),
);

if (props.storageMode && props.storageMode === StorageMode.TIERED) {
if (!props.kafkaVersion.isTieredStorageCompatible()) {
throw Error(`To deploy a tiered cluster you must select a compatible Kafka version, got ${props.kafkaVersion.version}`);
}
if (instanceType === this.mskInstanceType(
ec2.InstanceType.of(ec2.InstanceClass.T3, ec2.InstanceSize.SMALL),
)) {
throw Error('Tiered storage doesn\'t support broker type t3.small');
}
}

const encryptionAtRest = props.ebsStorageInfo?.encryptionKey
? {
dataVolumeKmsKeyId:
Expand Down Expand Up @@ -683,6 +745,7 @@ export class Cluster extends ClusterBase {
configurationInfo: props.configurationInfo,
enhancedMonitoring: props.monitoring?.clusterMonitoringLevel,
openMonitoring: openMonitoring,
storageMode: props.storageMode,
loggingInfo: loggingInfo,
clientAuthentication: clientAuthentication,
});
Expand Down
Loading