-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(kinesis): support stream consumers #32087
Conversation
- introduce `StreamConsumer` construct to model `AWS::Kinesis::StreamConsumer` - introduce `addToResourcePolicy` to enable creating/configuring a resource policy for the consumer - introduce `grant` and `grantRead` for granting permissions - leverage `iam.Grant.addToPrincipalOrResource` in `grant` to be able to use `grant` methods cross environments to update the grantee's iam policy and the consumer's resource policy as needed - update `ResourcePolicy` to support both `Stream` and `StreamConsumer` - update `Stream`'s `grant` to leverage `iam.Grant.addToPrincipalOrResource` for cross-environment support closes aws#32050
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #32087 +/- ##
=======================================
Coverage 82.16% 82.16%
=======================================
Files 119 119
Lines 6857 6857
Branches 1157 1157
=======================================
Hits 5634 5634
Misses 1120 1120
Partials 103 103
Flags with carried forward coverage won't be shown. Click here to find out more.
|
…aws-cdk into kinesis-stream-consumer
One other thing I noticed, is that in lambda event sources, the KinesisEventSource assumes an
A potential implementation
import * as constructs from 'constructs';
import { StreamEventSource, StreamEventSourceProps } from './stream';
import * as iam from '../../aws-iam';
import * as kinesis from '../../aws-kinesis';
import * as lambda from '../../aws-lambda';
import * as cdk from '../../core';
export interface KinesisEventSourceProps extends StreamEventSourceProps {
/**
* The time from which to start reading, in Unix time seconds.
*
* @default - no timestamp
*/
readonly startingPositionTimestamp?: number;
}
interface KinesisSource {
readonly node: constructs.Node;
readonly sourceArn: string;
readonly eventSourceName: string;
grantRead(grantee: iam.IGrantable): iam.Grant;
}
abstract class KinesisEventSourceBase extends StreamEventSource {
private _eventSourceMappingId?: string = undefined;
private _eventSourceMappingArn?: string = undefined;
private startingPositionTimestamp?: number;
constructor(readonly source: KinesisSource, props: KinesisEventSourceProps) {
super(props);
this.startingPositionTimestamp = props.startingPositionTimestamp;
this.props.batchSize !== undefined && cdk.withResolved(this.props.batchSize, batchSize => {
if (batchSize < 1 || batchSize > 10000) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize})`);
}
});
}
public bind(target: lambda.IFunction) {
const eventSourceMapping = target.addEventSourceMapping(`${this.source.eventSourceName}:${cdk.Names.nodeUniqueId(this.source.node)}`,
this.enrichMappingOptions({
eventSourceArn: this.source.sourceArn,
startingPositionTimestamp: this.startingPositionTimestamp,
metricsConfig: this.props.metricsConfig,
}),
);
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;
this._eventSourceMappingArn = eventSourceMapping.eventSourceMappingArn;
this.source.grantRead(target);
}
/**
* The identifier for this EventSourceMapping
*/
public get eventSourceMappingId(): string {
if (!this._eventSourceMappingId) {
throw new Error(`${this.source.eventSourceName} is not yet bound to an event source mapping`);
}
return this._eventSourceMappingId;
}
/**
* The ARN for this EventSourceMapping
*/
public get eventSourceMappingArn(): string {
if (!this._eventSourceMappingArn) {
throw new Error(`${this.source.eventSourceName} is not yet bound to an event source mapping`);
}
return this._eventSourceMappingArn;
}
}
/**
* Use an Amazon Kinesis stream as an event source for AWS Lambda.
*/
export class KinesisEventSource extends KinesisEventSourceBase {
constructor(stream: kinesis.IStream, props: KinesisEventSourceProps) {
super({ ...stream, eventSourceName: 'KinesisEventSource', sourceArn: stream.streamArn }, props);
}
}
/**
* Use an Amazon Kinesis stream consumer as an event source for AWS Lambda.
*/
export class KinesisConsumerEventSource extends KinesisEventSourceBase {
constructor(streamConsumer: kinesis.IStreamConsumer, props: KinesisEventSourceProps) {
super({ ...streamConsumer, eventSourceName: 'KinesisConsumerEventSource', sourceArn: streamConsumer.streamConsumerArn }, props);
}
} |
…aws-cdk into kinesis-stream-consumer
// and `SubscribeToShard` APIs. | ||
// The Lambda::EventSourceMapping resource validates against the `DescribeStream` permission. So we add it explicitly. | ||
// FIXME This permission can be removed when the event source mapping resource drops it from validation. | ||
this.stream.grant(target, 'kinesis:DescribeStream'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This permission is now already part of grantRead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
Pull request has been modified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork). |
This pull request has been removed from the queue for the following reason: The pull request can't be updated You should look at the reason for the failure and decide if the pull request needs to be fixed or if you want to requeue it. If you want to requeue this pull request, you need to post a comment with the text: |
Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork). |
@Mergifyio requeue |
✅ The queue state of this pull request has been cleaned. It can be re-embarked automatically |
This pull request has been removed from the queue for the following reason: The pull request can't be updated You should look at the reason for the failure and decide if the pull request needs to be fixed or if you want to requeue it. If you want to requeue this pull request, you need to post a comment with the text: |
@Mergifyio requeue |
@mergify update |
✅ The queue state of this pull request has been cleaned. It can be re-embarked automatically |
❌ Mergify doesn't have permission to updateFor security reasons, Mergify can't update this pull request. Try updating locally. |
Pull request has been modified.
Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork). |
Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork). |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Comments on closed issues and PRs are hard for our team to see. |
Issue # (if applicable)
Closes #32050
Reason for this change
Support Enhanced fan-out consumers via
AWS::Kinesis::StreamConsumer
and facilitate cross-account stream consumption via LambdaDescription of changes
StreamConsumer
construct to modelAWS::Kinesis::StreamConsumer
addToResourcePolicy
to enable creating/configuring a resource policy for the consumergrant
andgrantRead
for granting permissionsiam.Grant.addToPrincipalOrResource
ingrant
to be able to usegrant
methods cross environments to update the grantee's iam policy and the consumer's resource policy as neededResourcePolicy
to support bothStream
andStreamConsumer
Stream
'sgrant
to leverageiam.Grant.addToPrincipalOrResource
for cross-environment supportKinesisConsumerEventSource
tolambda-event-sources
for use with the newly introducedStreamConsumer
Useful links
Description of how you validated changes
unit and integration tests
Checklist
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license