Skip to content

Commit

Permalink
feat(lambda-event-sources): added filters support to kafka sources (#…
Browse files Browse the repository at this point in the history
…26366)

This change grants the possibility to [specify event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-msk-smak) on the `ManagedKafkaEventSource` and `SelfManagedKafkaEventSource` constructs via the `filters` property.

Closes #26348.

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
lpizzinidev authored Jul 14, 2023
1 parent 2f8df43 commit c575dde
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
"FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"FilterCriteria": {
"Filters": [{
"Pattern":"{\"numericEquals\":[{\"numeric\":[\"=\",1]}]}"
}]
},
"FunctionName": {
"Ref": "FC4345940"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH,
rootCACertificate: rootCASecret,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
filters: [
lambda.FilterCriteria.filter({
numericEquals: lambda.FilterRule.isEqual(1),
}),
],
}),
);
}
Expand Down
24 changes: 24 additions & 0 deletions packages/aws-cdk-lib/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,30 @@ myFunction.addEventSource(new SelfManagedKafkaEventSource({

If your self managed Kafka cluster is only reachable via VPC also configure `vpc` `vpcSubnets` and `securityGroup`.

You can specify [event filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-msk-smak)
for managed and self managed Kafka clusters using the `filters` property:
```ts
import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';

// Your MSK cluster arn
const clusterArn = 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4';

// The Kafka topic you want to subscribe to
const topic = 'some-cool-topic';

declare const myFunction: lambda.Function;
myFunction.addEventSource(new ManagedKafkaEventSource({
clusterArn,
topic,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
filters: [
lambda.FilterCriteria.filter({
stringEquals: lambda.FilterRule.isEqual('test'),
}),
],
}));
```

## Roadmap

Eventually, this module will support all the event sources described under
Expand Down
10 changes: 10 additions & 0 deletions packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
* @default - none
*/
readonly consumerGroupId?: string;

/**
* Add filter criteria to Event Source
* @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html
*
* @default - none
*/
readonly filters?: Array<{[key: string]: any}>
}

/**
Expand Down Expand Up @@ -130,6 +138,7 @@ export class ManagedKafkaEventSource extends StreamEventSource {
`KafkaEventSource:${Names.nodeUniqueId(target.node)}${this.innerProps.topic}`,
this.enrichMappingOptions({
eventSourceArn: this.innerProps.clusterArn,
filters: this.innerProps.filters,
startingPosition: this.innerProps.startingPosition,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
kafkaTopic: this.innerProps.topic,
Expand Down Expand Up @@ -217,6 +226,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
target.addEventSourceMapping(
this.mappingId(target),
this.enrichMappingOptions({
filters: this.innerProps.filters,
kafkaBootstrapServers: this.innerProps.bootstrapServers,
kafkaTopic: this.innerProps.topic,
kafkaConsumerGroupId: this.innerProps.consumerGroupId,
Expand Down
82 changes: 82 additions & 0 deletions packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,46 @@ describe('KafkaEventSource', () => {
});

});

test('with filters', () => {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const clusterArn = 'some-arn';
const kafkaTopic = 'some-topic';

// WHEN
fn.addEventSource(new sources.ManagedKafkaEventSource(
{
clusterArn,
topic: kafkaTopic,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
filters: [
lambda.FilterCriteria.filter({
orFilter: lambda.FilterRule.or('one', 'two'),
stringEquals: lambda.FilterRule.isEqual('test'),
}),
lambda.FilterCriteria.filter({
numericEquals: lambda.FilterRule.isEqual(1),
}),
],
}));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
FilterCriteria: {
Filters: [
{
Pattern: '{"orFilter":["one","two"],"stringEquals":["test"]}',
},
{
Pattern: '{"numericEquals":[{"numeric":["=",1]}]}',
},
],
},
});
});

});

describe('self-managed kafka', () => {
Expand Down Expand Up @@ -202,6 +242,48 @@ describe('KafkaEventSource', () => {
});

});

test('with filters', () => {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const kafkaTopic = 'some-topic';
const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
const bootstrapServers = ['kafka-broker:9092'];

// WHEN
fn.addEventSource(new sources.SelfManagedKafkaEventSource(
{
bootstrapServers: bootstrapServers,
topic: kafkaTopic,
secret: secret,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
filters: [
lambda.FilterCriteria.filter({
orFilter: lambda.FilterRule.or('one', 'two'),
stringEquals: lambda.FilterRule.isEqual('test'),
}),
lambda.FilterCriteria.filter({
numericEquals: lambda.FilterRule.isEqual(1),
}),
],
}));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
FilterCriteria: {
Filters: [
{
Pattern: '{"orFilter":["one","two"],"stringEquals":["test"]}',
},
{
Pattern: '{"numericEquals":[{"numeric":["=",1]}]}',
},
],
},
});
});

test('without vpc, secret must be set', () => {
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
Expand Down

0 comments on commit c575dde

Please sign in to comment.