Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesis): support stream consumers #32087

Merged
merged 30 commits into from
Feb 18, 2025
Merged

Conversation

humanzz
Copy link
Contributor

@humanzz humanzz commented Nov 11, 2024

Issue # (if applicable)

Closes #32050

Reason for this change

Support Enhanced fan-out consumers via AWS::Kinesis::StreamConsumer and facilitate cross-account stream consumption via Lambda

Description of changes

  • introduce StreamConsumer construct to model AWS::Kinesis::StreamConsumer
    • introduce addToResourcePolicy to enable creating/configuring a resource policy for the consumer
    • introduce grant and grantRead for granting permissions
    • leverage iam.Grant.addToPrincipalOrResource in grant to be able to use grant methods cross environments to update the grantee's iam policy and the consumer's resource policy as needed
  • update ResourcePolicy to support both Stream and StreamConsumer
  • update Stream's grant to leverage iam.Grant.addToPrincipalOrResource for cross-environment support
  • introduce KinesisConsumerEventSource to lambda-event-sources for use with the newly introduced StreamConsumer

Useful links

Description of how you validated changes

unit and integration tests

Checklist


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license

- introduce `StreamConsumer` construct to model `AWS::Kinesis::StreamConsumer`
  - introduce `addToResourcePolicy` to enable creating/configuring a resource policy for the consumer
  - introduce `grant` and `grantRead` for granting permissions
  - leverage `iam.Grant.addToPrincipalOrResource` in `grant` to be able to use `grant` methods cross environments to update the grantee's iam policy and the consumer's resource policy as needed
- update `ResourcePolicy` to support both `Stream` and `StreamConsumer`
- update `Stream`'s `grant` to leverage `iam.Grant.addToPrincipalOrResource` for cross-environment support

closes aws#32050
@github-actions github-actions bot added admired-contributor [Pilot] contributed between 13-24 PRs to the CDK effort/small Small work item – less than a day of effort feature-request A feature should be added or improved. p2 labels Nov 11, 2024
@aws-cdk-automation aws-cdk-automation requested a review from a team November 11, 2024 10:43
Copy link

codecov bot commented Nov 11, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 82.16%. Comparing base (8eeb8e4) to head (9f0070b).

Additional details and impacted files
@@           Coverage Diff           @@
##             main   #32087   +/-   ##
=======================================
  Coverage   82.16%   82.16%           
=======================================
  Files         119      119           
  Lines        6857     6857           
  Branches     1157     1157           
=======================================
  Hits         5634     5634           
  Misses       1120     1120           
  Partials      103      103           
Flag Coverage Δ
suite.unit 82.16% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
packages/aws-cdk ∅ <ø> (∅)
packages/aws-cdk-lib/core 82.16% <ø> (ø)

@aws-cdk-automation aws-cdk-automation added the pr/needs-community-review This PR needs a review from a Trusted Community Member or Core Team Member. label Nov 11, 2024
@humanzz
Copy link
Contributor Author

humanzz commented Nov 22, 2024

One other thing I noticed, is that in lambda event sources, the KinesisEventSource assumes an IStream, and I think that will need to be changed to allow for StreamConsumer

constructor(readonly stream: kinesis.IStream, props: KinesisEventSourceProps) {

A potential implementation

  • Extract the logic into a private base class e.g. KinesisEventSourceBase which takes as input a private KinesisSource
  • Update existing KinesisEventSource to extend KinesisEventSourceBase
  • Introduce a new KinesisConsumerEventSource also extending KinesisEventSourceBase
import * as constructs from 'constructs';
import { StreamEventSource, StreamEventSourceProps } from './stream';
import * as iam from '../../aws-iam';
import * as kinesis from '../../aws-kinesis';
import * as lambda from '../../aws-lambda';
import * as cdk from '../../core';

export interface KinesisEventSourceProps extends StreamEventSourceProps {
  /**
   * The time from which to start reading, in Unix time seconds.
   *
   * @default - no timestamp
   */
  readonly startingPositionTimestamp?: number;
}

interface KinesisSource {
  readonly node: constructs.Node;
  readonly sourceArn: string;
  readonly eventSourceName: string;
  grantRead(grantee: iam.IGrantable): iam.Grant;
}

abstract class KinesisEventSourceBase extends StreamEventSource {
  private _eventSourceMappingId?: string = undefined;
  private _eventSourceMappingArn?: string = undefined;
  private startingPositionTimestamp?: number;

  constructor(readonly source: KinesisSource, props: KinesisEventSourceProps) {
    super(props);
    this.startingPositionTimestamp = props.startingPositionTimestamp;

    this.props.batchSize !== undefined && cdk.withResolved(this.props.batchSize, batchSize => {
      if (batchSize < 1 || batchSize > 10000) {
        throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize})`);
      }
    });
  }

  public bind(target: lambda.IFunction) {
    const eventSourceMapping = target.addEventSourceMapping(`${this.source.eventSourceName}:${cdk.Names.nodeUniqueId(this.source.node)}`,
      this.enrichMappingOptions({
        eventSourceArn: this.source.sourceArn,
        startingPositionTimestamp: this.startingPositionTimestamp,
        metricsConfig: this.props.metricsConfig,
      }),
    );
    this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;
    this._eventSourceMappingArn = eventSourceMapping.eventSourceMappingArn;

    this.source.grantRead(target);
  }

  /**
   * The identifier for this EventSourceMapping
   */
  public get eventSourceMappingId(): string {
    if (!this._eventSourceMappingId) {
      throw new Error(`${this.source.eventSourceName} is not yet bound to an event source mapping`);
    }
    return this._eventSourceMappingId;
  }

  /**
   * The ARN for this EventSourceMapping
   */
  public get eventSourceMappingArn(): string {
    if (!this._eventSourceMappingArn) {
      throw new Error(`${this.source.eventSourceName} is not yet bound to an event source mapping`);
    }
    return this._eventSourceMappingArn;
  }
}

/**
 * Use an Amazon Kinesis stream as an event source for AWS Lambda.
 */
export class KinesisEventSource extends KinesisEventSourceBase {
  constructor(stream: kinesis.IStream, props: KinesisEventSourceProps) {
    super({ ...stream, eventSourceName: 'KinesisEventSource', sourceArn: stream.streamArn }, props);
  }
}

/**
 * Use an Amazon Kinesis stream consumer as an event source for AWS Lambda.
 */
export class KinesisConsumerEventSource extends KinesisEventSourceBase {
  constructor(streamConsumer: kinesis.IStreamConsumer, props: KinesisEventSourceProps) {
    super({ ...streamConsumer, eventSourceName: 'KinesisConsumerEventSource', sourceArn: streamConsumer.streamConsumerArn }, props);
  }
}

// and `SubscribeToShard` APIs.
// The Lambda::EventSourceMapping resource validates against the `DescribeStream` permission. So we add it explicitly.
// FIXME This permission can be removed when the event source mapping resource drops it from validation.
this.stream.grant(target, 'kinesis:DescribeStream');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This permission is now already part of grantRead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@aws-cdk-automation aws-cdk-automation removed the pr/needs-community-review This PR needs a review from a Trusted Community Member or Core Team Member. label Feb 14, 2025
@mergify mergify bot dismissed GavinZZ’s stale review February 17, 2025 09:53

Pull request has been modified.

@aws-cdk-automation aws-cdk-automation added the pr/needs-community-review This PR needs a review from a Trusted Community Member or Core Team Member. label Feb 17, 2025
@GavinZZ GavinZZ self-assigned this Feb 18, 2025
GavinZZ
GavinZZ previously approved these changes Feb 18, 2025
Copy link
Contributor

@GavinZZ GavinZZ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

Copy link
Contributor

mergify bot commented Feb 18, 2025

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

Copy link
Contributor

mergify bot commented Feb 18, 2025

This pull request has been removed from the queue for the following reason: pull request branch update failed.

The pull request can't be updated

You should look at the reason for the failure and decide if the pull request needs to be fixed or if you want to requeue it.

If you want to requeue this pull request, you need to post a comment with the text: @mergifyio requeue

@aws-cdk-automation aws-cdk-automation removed the pr/needs-community-review This PR needs a review from a Trusted Community Member or Core Team Member. label Feb 18, 2025
Copy link
Contributor

mergify bot commented Feb 18, 2025

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

@GavinZZ
Copy link
Contributor

GavinZZ commented Feb 18, 2025

@Mergifyio requeue

Copy link
Contributor

mergify bot commented Feb 18, 2025

requeue

✅ The queue state of this pull request has been cleaned. It can be re-embarked automatically

Copy link
Contributor

mergify bot commented Feb 18, 2025

This pull request has been removed from the queue for the following reason: pull request branch update failed.

The pull request can't be updated

You should look at the reason for the failure and decide if the pull request needs to be fixed or if you want to requeue it.

If you want to requeue this pull request, you need to post a comment with the text: @mergifyio requeue

@GavinZZ
Copy link
Contributor

GavinZZ commented Feb 18, 2025

@Mergifyio requeue

@GavinZZ
Copy link
Contributor

GavinZZ commented Feb 18, 2025

@mergify update

Copy link
Contributor

mergify bot commented Feb 18, 2025

requeue

✅ The queue state of this pull request has been cleaned. It can be re-embarked automatically

Copy link
Contributor

mergify bot commented Feb 18, 2025

update

❌ Mergify doesn't have permission to update

For security reasons, Mergify can't update this pull request. Try updating locally.
GitHub response: refusing to allow a GitHub App to create or update workflow .github/workflows/codecov.yml without workflows permission

@mergify mergify bot dismissed GavinZZ’s stale review February 18, 2025 22:19

Pull request has been modified.

Copy link
Contributor

mergify bot commented Feb 18, 2025

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

@mergify mergify bot merged commit f4453c7 into aws:main Feb 18, 2025
20 checks passed
Copy link
Contributor

mergify bot commented Feb 18, 2025

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

@aws-cdk-automation
Copy link
Collaborator

AWS CodeBuild CI Report

  • CodeBuild project: AutoBuildv2Project1C6BFA3F-wQm2hXv2jqQv
  • Commit ID: 9f0070b
  • Result: SUCCEEDED
  • Build Logs (available for 30 days)

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

Copy link

Comments on closed issues and PRs are hard for our team to see.
If you need help, please open a new issue that references this one.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Feb 18, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
admired-contributor [Pilot] contributed between 13-24 PRs to the CDK effort/small Small work item – less than a day of effort feature-request A feature should be added or improved. p2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kinesis: model AWS::Kinesis::StreamConsumer (L2 construct)
5 participants