Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws-stepfunctions-tasks): allow specifying waitForTaskToken suffix in resourceArn #2686

Merged
merged 9 commits into from
Jun 18, 2019
31 changes: 29 additions & 2 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,53 @@ import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');

/**
* Properties for InvokeFunction
*/
export interface InvokeFunctionProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*/
readonly payload?: { [key: string]: string };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be "string => any"


/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
* A StepFunctions Task to invoke a Lambda function.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
*/
export class InvokeFunction implements sfn.IStepFunctionsTask {
constructor(private readonly lambdaFunction: lambda.IFunction) {

private readonly waitForTaskToken: boolean;

constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

props should be optional (add = { })

this.waitForTaskToken = props.waitForTaskToken === true;
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties {
return {
resourceArn: this.lambdaFunction.functionArn,
resourceArn: this.waitForTaskToken
albegali marked this conversation as resolved.
Show resolved Hide resolved
? 'arn:aws:states:::lambda:invoke.waitForTaskToken'
: this.lambdaFunction.functionArn,
policyStatements: [new iam.PolicyStatement()
.addResource(this.lambdaFunction.functionArn)
.addActions("lambda:InvokeFunction")
],
metricPrefixSingular: 'LambdaFunction',
metricPrefixPlural: 'LambdaFunctions',
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
parameters: {
FunctionName: this.lambdaFunction.functionName,
...this.props.payload && { Payload: this.props.payload },
eladb marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Parameters.Payload will work in case of a task invocation. Will it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also like an additional validation that if waitForTaskToken is used, the magic token substitution field is used somewhere in the payload. If not, that should be an error. For all classes that implement this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if props.waitForTaskToken is true, I have to validate props.payload in order to check if something like "token.$":"$$.Task.Token" is present. Did I get it right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I would like you to make a new function on JsonPath (something like JsonPath.taskToken(), similar to the other functions), and make sure one of the fields in the payload structure contains the value returned. Be aware that the payload could be a deep structure, so you will need to recurse into it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will do the start of this. There is some work I want done in the SFN module anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you're going to need is in #2706

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That other PR has been merged. You may continue :).

Please also have a look at what @wqzoww has written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @rix0rrr only one thing: may I ask you how to use FieldUtils.containsTaskToken to validate the payload in order to check if the token is present?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect you to call it on the payload structure in the constructor, and throw an error if the function returns false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i've done it in the latest commit. But tests (unit and integ) are failing now, could you take a look please?

}
};
}
}
18 changes: 15 additions & 3 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export interface PublishToTopicProps {
* Message subject
*/
readonly subject?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -47,24 +54,29 @@ export interface PublishToTopicProps {
* integration with other AWS services via a specific class instance.
*/
export class PublishToTopic implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
this.waitForTaskToken = props.waitForTaskToken === true;

if ((props.message === undefined) === (props.messageObject === undefined)) {
throw new Error(`Supply exactly one of 'message' or 'messageObject'`);
}
}

public bind(task: sfn.Task): sfn.StepFunctionsTaskProperties {
return {
resourceArn: 'arn:aws:states:::sns:publish',
resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement()
.addAction('sns:Publish')
.addResource(this.topic.topicArn)
],
parameters: {
TopicArn: this.topic.topicArn,
...(this.props.messageObject
? { Message: new cdk.Token(() => task.node.stringifyJson(this.props.messageObject)) }
: renderString('Message', this.props.message)),
? { Message: new cdk.Token(() => task.node.stringifyJson(this.props.messageObject)) }
: renderString('Message', this.props.message)),
MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined,
...renderString('Subject', this.props.subject),
}
Expand Down
13 changes: 12 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export interface SendToQueueProps {
* @default No group ID
*/
readonly messageGroupId?: string;

/**
* Whether to pause the workflow until a task token is returned
*
* @default false
*/
readonly waitForTaskToken?: boolean;
}

/**
Expand All @@ -47,12 +54,16 @@ export interface SendToQueueProps {
* integration with other AWS services via a specific class instance.
*/
export class SendToQueue implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;

constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) {
this.waitForTaskToken = props.waitForTaskToken === true;
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties {
return {
resourceArn: 'arn:aws:states:::sqs:sendMessage',
resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
policyStatements: [new iam.PolicyStatement()
.addAction('sqs:SendMessage')
.addResource(this.queue.queueArn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,33 @@ test('Lambda function can be used in a Task', () => {
handler: 'index.hello',
runtime: lambda.Runtime.Python27,
});
const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) });
const task = new sfn.Task(stack, 'Task', {
task: new tasks.InvokeFunction(fn, { waitForTaskToken: false })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually try not to change existing tests, but rather add new tests. That's a good way to ensure that we didn't break anything. Since waitForTaskToken is false by default, I'd expect this test to remain untouched throughout this change and if it is changed, we need to understand if the change makes sense.

});
new sfn.StateMachine(stack, 'SM', {
definition: task
});

// THEN
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"",
{ "Fn::GetAtt": ["Fn9270CBC0", "Arn"] },
"\"}}}"
]]
"Fn::Join": [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

"",
[
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
{
Ref: "Fn9270CBC0"
},
"\"},\"Type\":\"Task\",\"Resource\":\"",
{
"Fn::GetAtt": [
"Fn9270CBC0",
"Arn"
]
},
"\"}}}"
]
]
},
});
});