Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(lambda): Support S3 as onFailure destinations on MSK and SelfMan…
…agedKafka events (#28010) ## Summary This PR will include following features - add destination(currently sns/sqs) for Kafka event sources (SelfManagedKafkaEventSource andManagedKafkaEventSource) - add a dedicated S3 destination to the kafka event sources (non-kafka event source doesn't support S3 as destination) ## Backgrounds Lambda Event Source Mapping (ESM) processes events from event sources in a sequence manner. However, a potential issue with this approach is that if a record is deemed a “poison pill”, it will be retried indefinitely until it is processed successfully or until the record expires. This can cause delays in processing others records in the queue. Additionally, for events that exceed the Lambda payload limit of 6 MB, they might be dropped as they cannot be processed. Today, Lambda supports configuring an OnSuccess destination and OnFailure destination for asynchronous invocations. For stream-based event sources, such as Kinesis, and DynamoDB streams, Lambda supports configuring an OnFailure destination. Regarding SQS, SQS poller doesn’t support OnFailure destination, but SQS support Dead Letter Queue (DLQ) natively. For CDK, Some event source mappings (events) can have onFailure destination through a DestinationConfig. Right now that supports only DynamoDB and Kinesis event sources and only SQS and SNS destinations. ## Solution - Add a new `s3onFailureDestination` destination in `s3-ofd.ts` file - Add a new field `onFailure` to `KafkaEventSourceProps` that customer can use `s3onFailureDestination` to pass in. - Add a check in `enrichMappingOptions` for every event type against `EventSourceMappingOptions.supportS3OFD` to check if they support S3 as onFailure. ## User Experience ```diff import { Secret } from 'aws-cdk-lib/aws-secretsmanager'; + import { ManagedKafkaEventSource, S3OnFailureDestination } from 'aws-cdk-lib/aws-lambda-event-sources'; import * as lambda from 'aws-cdk-lib/aws-lambda' import { App, StackProps, Stack } from 'aws-cdk-lib'; + import { Bucket } from 'aws-cdk-lib/aws-s3'; export class CdkTestStack extends Stack { constructor(scope: App, id: string, props?: StackProps) { super(scope, id, props); const myFunction = new lambda.Function(this, 'myFunction', { runtime: lambda.Runtime.NODEJS_16_X, handler: 'index.handler', code: lambda.Code.fromInline('//handler_code_here'), }); // Your MSK cluster arn const clusterArn = 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4'; // The Kafka topic you want to subscribe to const topic = 'some-cool-topic'; // The secret that allows access to your MSK cluster // You still have to make sure that it is associated with your cluster as described in the documentation const secret = new Secret(this, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); // Your bucket for Kafka's onFailure Destination + const bucket = Bucket.fromBucketName(this, 'BucketByName', 'my-bucket'); + const s3ofd = new S3OnFailureDestination(bucket); myFunction.addEventSource(new ManagedKafkaEventSource({ clusterArn, topic: topic, secret: secret, batchSize: 100, // default startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3ofd, })); } } ``` ## sample synth output ``` myFunctionServiceRoleDefaultPolicyECBA61F7: Type: AWS::IAM::Policy Properties: PolicyDocument: Statement: - Action: - s3:Abort* - s3:DeleteObject* - s3:GetBucket* - s3:GetObject* - s3:List* - s3:PutObject - s3:PutObjectLegalHold - s3:PutObjectRetention - s3:PutObjectTagging - s3:PutObjectVersionTagging Effect: Allow Resource: - Fn::Join: - "" - - "arn:" - Ref: AWS::Partition - :s3:::my-bucket - Fn::Join: - "" - - "arn:" - Ref: AWS::Partition - :s3:::my-bucket/* myFunctionKafkaEventSourceCdkTestStackmyFunction09D44E80somecooltopic1B4580FF: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 100 DestinationConfig: OnFailure: Destination: Fn::Join: - "" - - "arn:" - Ref: AWS::Partition - :s3:::my-bucket ``` ## To discuss ### Change parent class name The parent class of `s3ofd` is called `IEventSourceDlq`. `DLQ` stands for Dead Letter Queue, Which is very specific for SQS, we might want to work out a new naming for S3 - Option 1: Rename the `IEventSourceDlq` to `IEventSourceOfd` (Ofd: OnFailureDestinaion)And create `IEventSourceDlq` again to extends from it for keeping backwards compatibility. For S3, Create a new class `IEventSourceS3ofd`. also extend from the parent class and will be implemented by our new S3 Destination class. - Option 2: Don’t create new parent classes, the new S3 Destination class just extend the original `IEventSourceDlq` Class. Like what is shown in the current code. ### Where to add checking for s3 on failure desintation support Currently in the commit, I was checking for s3ofd support in `enrichMappingOptions` Which will be called by all event source classes. Does this design make sense, or should we create a new dedicated function for chcecking? ## Questions: - Check out the current UX Closes #<issue number here>. ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
- Loading branch information