diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json index 0c9dd29d0fe28..58212c05ee626 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json @@ -85,6 +85,11 @@ "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { + "FilterCriteria": { + "Filters": [{ + "Pattern":"{\"numericEquals\":[{\"numeric\":[\"=\",1]}]}" + }] + }, "FunctionName": { "Ref": "FC4345940" }, diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts index 3503f56ad26b3..2ddbece1eee21 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts @@ -53,6 +53,11 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH, rootCACertificate: rootCASecret, startingPosition: lambda.StartingPosition.TRIM_HORIZON, + filters: [ + lambda.FilterCriteria.filter({ + numericEquals: lambda.FilterRule.isEqual(1), + }), + ], }), ); } diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md index 6cf23bbb519e5..9113192a33667 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md @@ -275,6 +275,30 @@ myFunction.addEventSource(new SelfManagedKafkaEventSource({ If your self managed Kafka cluster is only reachable via VPC also configure `vpc` `vpcSubnets` and `securityGroup`. +You can specify [event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-msk-smak) +for managed and self managed Kafka clusters using the `filters` property: +```ts +import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; + +// Your MSK cluster arn +const clusterArn = 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4'; + +// The Kafka topic you want to subscribe to +const topic = 'some-cool-topic'; + +declare const myFunction: lambda.Function; +myFunction.addEventSource(new ManagedKafkaEventSource({ + clusterArn, + topic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + filters: [ + lambda.FilterCriteria.filter({ + stringEquals: lambda.FilterRule.isEqual('test'), + }), + ], +})); +``` + ## Roadmap Eventually, this module will support all the event sources described under diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index 3fea49565b186..1faa75bbf4102 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -29,6 +29,14 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps { * @default - none */ readonly consumerGroupId?: string; + + /** + * Add filter criteria to Event Source + * @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html + * + * @default - none + */ + readonly filters?: Array<{[key: string]: any}> } /** @@ -130,6 +138,7 @@ export class ManagedKafkaEventSource extends StreamEventSource { `KafkaEventSource:${Names.nodeUniqueId(target.node)}${this.innerProps.topic}`, this.enrichMappingOptions({ eventSourceArn: this.innerProps.clusterArn, + filters: this.innerProps.filters, startingPosition: this.innerProps.startingPosition, sourceAccessConfigurations: this.sourceAccessConfigurations(), kafkaTopic: this.innerProps.topic, @@ -217,6 +226,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { target.addEventSourceMapping( this.mappingId(target), this.enrichMappingOptions({ + filters: this.innerProps.filters, kafkaBootstrapServers: this.innerProps.bootstrapServers, kafkaTopic: this.innerProps.topic, kafkaConsumerGroupId: this.innerProps.consumerGroupId, diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index 1e3f8db271b02..1d0a962bae271 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -132,6 +132,46 @@ describe('KafkaEventSource', () => { }); }); + + test('with filters', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + // WHEN + fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + filters: [ + lambda.FilterCriteria.filter({ + orFilter: lambda.FilterRule.or('one', 'two'), + stringEquals: lambda.FilterRule.isEqual('test'), + }), + lambda.FilterCriteria.filter({ + numericEquals: lambda.FilterRule.isEqual(1), + }), + ], + })); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + FilterCriteria: { + Filters: [ + { + Pattern: '{"orFilter":["one","two"],"stringEquals":["test"]}', + }, + { + Pattern: '{"numericEquals":[{"numeric":["=",1]}]}', + }, + ], + }, + }); + }); + }); describe('self-managed kafka', () => { @@ -202,6 +242,48 @@ describe('KafkaEventSource', () => { }); }); + + test('with filters', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; + + // WHEN + fn.addEventSource(new sources.SelfManagedKafkaEventSource( + { + bootstrapServers: bootstrapServers, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + filters: [ + lambda.FilterCriteria.filter({ + orFilter: lambda.FilterRule.or('one', 'two'), + stringEquals: lambda.FilterRule.isEqual('test'), + }), + lambda.FilterCriteria.filter({ + numericEquals: lambda.FilterRule.isEqual(1), + }), + ], + })); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + FilterCriteria: { + Filters: [ + { + Pattern: '{"orFilter":["one","two"],"stringEquals":["test"]}', + }, + { + Pattern: '{"numericEquals":[{"numeric":["=",1]}]}', + }, + ], + }, + }); + }); + test('without vpc, secret must be set', () => { const stack = new cdk.Stack(); const fn = new TestFunction(stack, 'Fn');