Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws-stepfunctions-tasks): allow specifying waitForTaskToken suffix in resourceArn #2686

Merged
merged 9 commits into from
Jun 18, 2019
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './invoke-function';
export * from './run-lambda-task';
export * from './invoke-activity';
export * from './run-ecs-task-base'; // Remove this once we can
export * from './run-ecs-task-base-types';
Expand Down
20 changes: 16 additions & 4 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@ import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');

/**
* Properties for InvokeFunction
*/
export interface InvokeFunctionProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*
* @default - The JSON data indicated by the task's InputPath is used as payload
*/
readonly payload?: { [key: string]: any };
}

/**
* A StepFunctions Task to invoke a Lambda function.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
* OUTPUT: the output of this task is the return value of the Lambda Function.
*/
export class InvokeFunction implements sfn.IStepFunctionsTask {
constructor(private readonly lambdaFunction: lambda.IFunction) {
constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) {
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
Expand All @@ -22,6 +33,7 @@ export class InvokeFunction implements sfn.IStepFunctionsTask {
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
parameters: this.props.payload
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ export interface PublishToTopicProps {
* Message subject
*/
readonly subject?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -36,12 +43,20 @@ export interface PublishToTopicProps {
* integration with other AWS services via a specific class instance.
*/
export class PublishToTopic implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
this.waitForTaskToken = props.waitForTaskToken === true;

if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.message.value)) {
throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sns:publish',
resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement({
actions: ['sns:Publish'],
resources: [this.topic.topicArn]
Expand Down
101 changes: 101 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { FieldUtils } from '../../aws-stepfunctions/lib/fields';

/**
* Properties for RunLambdaTask
*/
export interface RunLambdaTaskProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*/
readonly payload?: { [key: string]: any };

/**
* Whether to pause the workflow until a task token is returned
*
* If this is set to true, the Context.taskToken value must be included
* somewhere in the payload and the Lambda must call
* `SendTaskSuccess/SendTaskFailure` using that token.
*
* @default false
*/
readonly waitForTaskToken?: boolean;

/**
* Invocation type of the Lambda function
*
* @default RequestResponse
*/
readonly invocationType?: InvocationType;

/**
* Client context to pass to the function
*
* @default - No context
*/
readonly clientContext?: string;
}

/**
* Invoke a Lambda function as a Task
*
* OUTPUT: the output of this task is either the return value of Lambda's
* Invoke call, or whatever the Lambda Function posted back using
* `SendTaskSuccess/SendTaskFailure` in `waitForTaskToken` mode.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
*/
export class RunLambdaTask implements sfn.IStepFunctionsTask {
private readonly waitForTaskToken: boolean;

constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: RunLambdaTaskProps = {}) {
this.waitForTaskToken = !!props.waitForTaskToken;

if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) {
throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
const resourceArn = 'arn:aws:states:::lambda:invoke' + (this.waitForTaskToken ? '.waitForTaskToken' : '');

return {
resourceArn,
policyStatements: [new iam.PolicyStatement({
resources: [this.lambdaFunction.functionArn],
actions: ["lambda:InvokeFunction"],
})],
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
parameters: {
FunctionName: this.lambdaFunction.functionName,
Payload: this.props.payload,
InvocationType: this.props.invocationType,
ClientContext: this.props.clientContext,
}
};
}
}

/**
* Invocation type of a Lambda
*/
export enum InvocationType {
/**
* Invoke synchronously
*
* The API response includes the function response and additional data.
*/
RequestResponse = 'RequestResponse',

/**
* Invoke asynchronously
*
* Send events that fail multiple times to the function's dead-letter queue (if it's configured).
* The API response only includes a status code.
*/
Event = 'Event',
}
17 changes: 16 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ export interface SendToQueueProps {
* @default No group ID
*/
readonly messageGroupId?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -45,12 +52,20 @@ export interface SendToQueueProps {
* integration with other AWS services via a specific class instance.
*/
export class SendToQueue implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) {
this.waitForTaskToken = props.waitForTaskToken === true;

if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.messageBody.value)) {
throw new Error('Task Token is missing in messageBody (pass Context.taskToken somewhere in messageBody)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sqs:sendMessage',
resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement({
actions: ['sqs:SendMessage'],
resources: [this.queue.queueArn]
Expand Down
Loading