Skip to content

Commit

Permalink
Add feedback from Jimmy
Browse files Browse the repository at this point in the history
  • Loading branch information
msambol committed Mar 26, 2024
1 parent 36da1ec commit f9484f8
Show file tree
Hide file tree
Showing 20 changed files with 272 additions and 34,320 deletions.
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-pipes-alpha/lib/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ export class Pipe extends PipeBase {
*/
const source = props.source.bind(this);
props.source.grantRead(this.pipeRole);
props.source.grantDlqPush(this.pipeRole);

// Add the filter criteria to the source parameters
const sourceParameters : CfnPipe.PipeSourceParametersProperty= {
...source.sourceParameters,
Expand Down
5 changes: 5 additions & 0 deletions packages/@aws-cdk/aws-pipes-alpha/lib/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,9 @@ export interface ISource {
* Grant the pipe role read access to the source.
*/
grantRead(grantee: IRole): void;

/**
* Grant the pipe role write access to the dead-letter targer.
*/
grantDlqPush(grantee: IRole): void;
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-alpha/test/integ.pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TestSource implements ISource {
grantRead(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantConsumeMessages(pipeRole);
}
grantDlqPush(): void {}
}

class TestTarget implements ITarget {
Expand Down
14 changes: 0 additions & 14 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/deadLetterConfig.ts

This file was deleted.

58 changes: 44 additions & 14 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/dynamodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { IPipe, ISource, SourceConfig } from '@aws-cdk/aws-pipes-alpha';
import { Duration } from 'aws-cdk-lib';
import { ITableV2 } from 'aws-cdk-lib/aws-dynamodb';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { DeadLetterConfigParameters } from './deadLetterConfig';
import { ITopic, Topic } from 'aws-cdk-lib/aws-sns';
import { IQueue, Queue } from 'aws-cdk-lib/aws-sqs';
import { DynamoDBStartingPosition, OnPartialBatchItemFailure } from './enums';

/**
Expand All @@ -12,55 +13,68 @@ export interface DynamoDBSourceParameters {
/**
* The maximum number of records to include in each batch.
*
* Minumum = 1
* Maxiumum = 10000
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-batchsize
* @default 1
*/
readonly batchSize?: number;

/**
* Define the target queue to send dead-letter queue events to.
* Define the target SQS queue or SNS topic to send dead-letter queue events to.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-deadletterconfig
* @default no dead letter queue
* @default - no dead letter queue
*/
readonly deadLetterConfig?: DeadLetterConfigParameters;
readonly deadLetterTarget?: IQueue | ITopic;

/**
* The maximum length of a time to wait for events.
*
* Minumum = Duration.seconds(0)
* Maxiumum = Duration.seconds(300)
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumbatchingwindowinseconds
* @default no batching window
* @default - no batching window
*/
readonly maximumBatchingWindow?: Duration;

/**
* (Streams only) Discard records older than the specified age. The default value is -1, which sets the maximum age to infinite. When the value is set to infinite, EventBridge never discards old records.
*
* Leave undefined to set the maximum record age to infinite.
* Minumum = 60 (leave undefined to set the maximum age to -1)
* Maxiumum = 604800
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumrecordageinseconds
* @default -1 (infinite)
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumrecordageinseconds
* @default -1 - maximum age is infinite
*/
readonly maximumRecordAge?: Duration;

/**
* (Streams only) Discard records after the specified number of retries. The default value is -1, which sets the maximum number of retries to infinite. When MaximumRetryAttempts is infinite, EventBridge retries failed records until the record expires in the event source.
*
* Minumum = -1
* Maxiumum = 10000
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumretryattempts
* @default -1 (infinite)
*/
readonly maximumRetryAttempts?: number;

/**
* (Streams only) Define how to handle item process failures. AUTOMATIC_BISECT halves each batch and retry each half until all the records are processed or there is one failed message left in the batch.
* (Streams only) Define how to handle item process failures. {@link OnPartialBatchItemFailure.AUTOMATIC_BISECT} halves each batch and retry each half until all the records are processed or there is one failed message left in the batch.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-onpartialbatchitemfailure
* @default off
*/
readonly onPartialBatchItemFailure?: OnPartialBatchItemFailure;

/**
* (Streams only) The number of batches to process concurrently from each shard. The default value is 1.
* (Streams only) The number of batches to process concurrently from each shard.
*
* Minumum = 1
* Maxiumum = 10
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-parallelizationfactor
* @default 1
Expand Down Expand Up @@ -88,6 +102,8 @@ export class DynamoDBSource implements ISource {
private maximumRecordAgeInSeconds;
private maximumRetryAttempts;
private parallelizationFactor;
private deadLetterTarget;
private deadLetterTargetArn;

constructor(table: ITableV2, parameters: DynamoDBSourceParameters) {
this.table = table;
Expand All @@ -104,6 +120,7 @@ export class DynamoDBSource implements ISource {
this.maximumRecordAgeInSeconds = this.sourceParameters.maximumRecordAge?.toSeconds();
this.maximumRetryAttempts = this.sourceParameters.maximumRetryAttempts;
this.parallelizationFactor = this.sourceParameters.parallelizationFactor;
this.deadLetterTarget = this.sourceParameters.deadLetterTarget;

if (this.batchSize !== undefined) {
if (this.batchSize < 1 || this.batchSize > 10000) {
Expand All @@ -118,8 +135,8 @@ export class DynamoDBSource implements ISource {
}
if (this.maximumRecordAgeInSeconds !== undefined) {
// only need to check upper bound since Duration amounts cannot be negative
if (this.maximumRecordAgeInSeconds > 604800) {
throw new Error(`Maximum record age in seconds must be between -1 and 604800, received ${this.maximumRecordAgeInSeconds}`);
if (this.maximumRecordAgeInSeconds < 60 || this.maximumRecordAgeInSeconds > 604800) {
throw new Error(`Maximum record age in seconds must be between 60 and 604800 (leave undefined for infinite), received ${this.maximumRecordAgeInSeconds}`);
}
}
if (this.maximumRetryAttempts !== undefined) {
Expand All @@ -132,14 +149,20 @@ export class DynamoDBSource implements ISource {
throw new Error(`Parallelization factor must be between 1 and 10, received ${this.parallelizationFactor}`);
}
}

if (this.deadLetterTarget instanceof Queue) {
this.deadLetterTargetArn = this.deadLetterTarget.queueArn;
} else if (this.deadLetterTarget instanceof Topic) {
this.deadLetterTargetArn = this.deadLetterTarget.topicArn;
}
}

bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: {
dynamoDbStreamParameters: {
batchSize: this.batchSize,
deadLetterConfig: this.sourceParameters.deadLetterConfig,
deadLetterConfig: this.deadLetterTargetArn ? { arn: this.deadLetterTargetArn } : undefined,
maximumBatchingWindowInSeconds: this.maximumBatchingWindowInSeconds,
maximumRecordAgeInSeconds: this.maximumRecordAgeInSeconds,
maximumRetryAttempts: this.maximumRetryAttempts,
Expand All @@ -154,5 +177,12 @@ export class DynamoDBSource implements ISource {
grantRead(grantee: IRole): void {
this.table.grantStreamRead(grantee);
}
}

grantDlqPush(grantee: IRole): void {
if (this.deadLetterTarget instanceof Queue) {
this.deadLetterTarget.grantSendMessages(grantee);
} else if (this.deadLetterTarget instanceof Topic) {
this.deadLetterTarget.grantPublish(grantee);
}
}
}
1 change: 0 additions & 1 deletion packages/@aws-cdk/aws-pipes-sources-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ export * from './sqs';
export * from './kinesis';
export * from './dynamodb';
export * from './enums';
export * from './deadLetterConfig';
52 changes: 40 additions & 12 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { IPipe, ISource, SourceConfig } from '@aws-cdk/aws-pipes-alpha';
import { Duration } from 'aws-cdk-lib';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IStream } from 'aws-cdk-lib/aws-kinesis';
import { DeadLetterConfigParameters } from './deadLetterConfig';
import { ITopic, Topic } from 'aws-cdk-lib/aws-sns';
import { IQueue, Queue } from 'aws-cdk-lib/aws-sqs';
import { KinesisStartingPosition, OnPartialBatchItemFailure } from './enums';

/**
Expand All @@ -12,6 +13,9 @@ export interface KinesisSourceParameters {
/**
* The maximum number of records to include in each batch.
*
* Minumum = 1
* Maxiumum = 10000
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-batchsize
* @default 1
*/
Expand All @@ -21,46 +25,53 @@ export interface KinesisSourceParameters {
* Define the target queue to send dead-letter queue events to.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-deadletterconfig
* @default no dead letter queue
* @default - no dead letter queue
*/
readonly deadLetterConfig?: DeadLetterConfigParameters;
readonly deadLetterTarget?: IQueue | ITopic;

/**
* The maximum length of a time to wait for events.
*
* Minumum = Duration.seconds(0)
* Maxiumum = Duration.seconds(300)
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-maximumbatchingwindowinseconds
* @default no batching window
* @default - no batching window
*/
readonly maximumBatchingWindow?: Duration;

/**
* (Streams only) Discard records older than the specified age. The default value is -1, which sets the maximum age to infinite. When the value is set to infinite, EventBridge never discards old records.
*
* Leave undefined to set the maximum record age to infinite.
* Minumum = 60 (leave undefined to set the maximum age to -1)
* Maxiumum = 604800
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-maximumrecordageinseconds
* @default -1 (infinite)
* @default -1 - maximum age is infinite
*/
readonly maximumRecordAge?: Duration;

/**
* (Streams only) Discard records after the specified number of retries. The default value is -1, which sets the maximum number of retries to infinite. When MaximumRetryAttempts is infinite, EventBridge retries failed records until the record expires in the event source.
*
* Minumum = -1
* Maxiumum = 10000
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-maximumretryattempts
* @default -1 (infinite)
*/
readonly maximumRetryAttempts?: number;

/**
* (Streams only) Define how to handle item process failures. AUTOMATIC_BISECT halves each batch and retry each half until all the records are processed or there is one failed message left in the batch.
* (Streams only) Define how to handle item process failures. {@link OnPartialBatchItemFailure.AUTOMATIC_BISECT} halves each batch and retry each half until all the records are processed or there is one failed message left in the batch.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-onpartialbatchitemfailure
* @default off
*/
readonly onPartialBatchItemFailure?: OnPartialBatchItemFailure;

/**
* (Streams only) The number of batches to process concurrently from each shard. The default value is 1.
* (Streams only) The number of batches to process concurrently from each shard.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-parallelizationfactor
* @default 1
Expand All @@ -78,7 +89,7 @@ export interface KinesisSourceParameters {
* With StartingPosition set to AT_TIMESTAMP, the time from which to start reading, in Unix time seconds.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-startingpositiontimestamp
* @default no starting position timestamp
* @default - no starting position timestamp
*/
readonly startingPositionTimestamp?: string;
}
Expand All @@ -96,6 +107,8 @@ export class KinesisSource implements ISource {
private maximumRecordAgeInSeconds;
private maximumRetryAttempts;
private parallelizationFactor;
private deadLetterTarget;
private deadLetterTargetArn;

constructor(stream: IStream, parameters: KinesisSourceParameters) {
this.stream = stream;
Expand All @@ -107,6 +120,7 @@ export class KinesisSource implements ISource {
this.maximumRecordAgeInSeconds = this.sourceParameters.maximumRecordAge?.toSeconds();
this.maximumRetryAttempts = this.sourceParameters.maximumRetryAttempts;
this.parallelizationFactor = this.sourceParameters.parallelizationFactor;
this.deadLetterTarget = this.sourceParameters.deadLetterTarget;

if (this.batchSize !== undefined) {
if (this.batchSize < 1 || this.batchSize > 10000) {
Expand All @@ -121,8 +135,8 @@ export class KinesisSource implements ISource {
}
if (this.maximumRecordAgeInSeconds !== undefined) {
// only need to check upper bound since Duration amounts cannot be negative
if (this.maximumRecordAgeInSeconds > 604800) {
throw new Error(`Maximum record age in seconds must be between -1 and 604800, received ${this.maximumRecordAgeInSeconds}`);
if (this.maximumRecordAgeInSeconds < 60 || this.maximumRecordAgeInSeconds > 604800) {
throw new Error(`Maximum record age in seconds must be between 60 and 604800 (leave undefined for infinite), received ${this.maximumRecordAgeInSeconds}`);
}
}
if (this.maximumRetryAttempts !== undefined) {
Expand All @@ -135,14 +149,20 @@ export class KinesisSource implements ISource {
throw new Error(`Parallelization factor must be between 1 and 10, received ${this.parallelizationFactor}`);
}
}

if (this.deadLetterTarget instanceof Queue) {
this.deadLetterTargetArn = this.deadLetterTarget.queueArn;
} else if (this.deadLetterTarget instanceof Topic) {
this.deadLetterTargetArn = this.deadLetterTarget.topicArn;
}
}

bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: {
kinesisStreamParameters: {
batchSize: this.batchSize,
deadLetterConfig: this.sourceParameters.deadLetterConfig,
deadLetterConfig: this.deadLetterTargetArn ? { arn: this.deadLetterTargetArn } : undefined,
maximumBatchingWindowInSeconds: this.maximumBatchingWindowInSeconds,
maximumRecordAgeInSeconds: this.maximumRecordAgeInSeconds,
maximumRetryAttempts: this.maximumRetryAttempts,
Expand All @@ -158,4 +178,12 @@ export class KinesisSource implements ISource {
grantRead(grantee: IRole): void {
this.stream.grantRead(grantee);
}

grantDlqPush(grantee: IRole): void {
if (this.deadLetterTarget instanceof Queue) {
this.deadLetterTarget.grantSendMessages(grantee);
} else if (this.deadLetterTarget instanceof Topic) {
this.deadLetterTarget.grantPublish(grantee);
}
}
}
Loading

0 comments on commit f9484f8

Please sign in to comment.