diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/call-dynamodb.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/call-dynamodb.ts new file mode 100644 index 0000000000000..9d6cb6e1f0f78 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/call-dynamodb.ts @@ -0,0 +1,789 @@ +import * as iam from '@aws-cdk/aws-iam'; +import * as sfn from '@aws-cdk/aws-stepfunctions'; +import { Stack } from '@aws-cdk/core'; +import { getResourceArn } from './resource-arn-suffix'; + +/** + * Determines the level of detail about provisioned throughput consumption that is returned. + */ +export enum ReturnConsumedCapacity { + /** + * The response includes the aggregate ConsumedCapacity for the operation, + * together with ConsumedCapacity for each table and secondary index that was accessed + */ + INDEXES = 'INDEXES', + + /** + * The response includes only the aggregate ConsumedCapacity for the operation. + */ + TOTAL = 'TOTAL', + + /** + * No ConsumedCapacity details are included in the response. + */ + NONE = 'NONE' +} + +/** + * Determines whether item collection metrics are returned. + */ +export enum ReturnItemCollectionMetrics { + /** + * If set to SIZE, the response includes statistics about item collections, + * if any, that were modified during the operation are returned in the response + */ + SIZE = 'SIZE', + + /** + * If set to NONE, no statistics are returned. + */ + NONE = 'NONE' +} + +/** + * Use ReturnValues if you want to get the item attributes as they appear before or after they are changed + */ +export enum ReturnValues { + /** + * Nothing is returned + */ + NONE = 'NONE', + + /** + * Returns all of the attributes of the item + */ + ALL_OLD = 'ALL_OLD', + + /** + * Returns only the updated attributes + */ + UPDATED_OLD = 'UPDATED_OLD', + + /** + * Returns all of the attributes of the item + */ + ALL_NEW = 'ALL_NEW', + + /** + * Returns only the updated attributes + */ + UPDATED_NEW = 'UPDATED_NEW' +} + +/** + * Map of string to AttributeValue + */ +export interface AttributeValueMap { + [key: string]: AttributeValue; +} + +/** + * Class to generate AttributeValue + */ +export class AttributeValue { + private attributeValue: any = {}; + + /** + * Sets an attribute of type String. For example: "S": "Hello" + */ + public addS(value: string) { + this.attributeValue.S = value; + return this; + } + + /** + * Sets an attribute of type Number. For example: "N": "123.45" + * Numbers are sent across the network to DynamoDB as strings, + * to maximize compatibility across languages and libraries. + * However, DynamoDB treats them as number type attributes for mathematical operations. + */ + public addN(value: string) { + this.attributeValue.N = value; + return this; + } + + /** + * Sets an attribute of type Binary. For example: "B": "dGhpcyB0ZXh0IGlzIGJhc2U2NC1lbmNvZGVk" + */ + public addB(value: string) { + this.attributeValue.B = value; + return this; + } + + /** + * Sets an attribute of type String Set. For example: "SS": ["Giraffe", "Hippo" ,"Zebra"] + */ + public addSS(value: string[]) { + this.attributeValue.SS = value; + return this; + } + + /** + * Sets an attribute of type Number Set. For example: "NS": ["42.2", "-19", "7.5", "3.14"] + * Numbers are sent across the network to DynamoDB as strings, + * to maximize compatibility across languages and libraries. + * However, DynamoDB treats them as number type attributes for mathematical operations. + */ + public addNS(value: string[]) { + this.attributeValue.NS = value; + return this; + } + + /** + * Sets an attribute of type Binary Set. For example: "BS": ["U3Vubnk=", "UmFpbnk=", "U25vd3k="] + */ + public addBS(value: string[]) { + this.attributeValue.BS = value; + return this; + } + + /** + * Sets an attribute of type Map. For example: "M": {"Name": {"S": "Joe"}, "Age": {"N": "35"}} + */ + public addM(value: AttributeValueMap) { + this.attributeValue.M = transformAttributeValueMap(value); + return this; + } + + /** + * Sets an attribute of type List. For example: "L": [ {"S": "Cookies"} , {"S": "Coffee"}, {"N", "3.14159"}] + */ + public addL(value: AttributeValue[]) { + this.attributeValue.L = value.map(val => val.toObject()); + return this; + } + + /** + * Sets an attribute of type Null. For example: "NULL": true + */ + public addNULL(value: boolean) { + this.attributeValue.NULL = value; + return this; + } + + /** + * Sets an attribute of type Boolean. For example: "BOOL": true + */ + public addBOOL(value: boolean) { + this.attributeValue.BOOL = value; + return this; + } + + /** + * Return the attributeValue object + */ + public toObject() { + return this.attributeValue; + } +} + +/** + * Property for any key + */ +export interface Attribute { + /** + * The name of the attribute + */ + readonly name: string; + + /** + * The value of the attribute + */ + readonly value: AttributeValue; +} + +/** + * Class to generate projection expression + */ +export class Expression { + private expression: string[] = []; + + /** + * Adds the passed attribute to the chain + * + * @param attr Attribute name + */ + public withAttribute(attr: string): Expression { + if (this.expression.length) { + this.expression.push(`.${attr}`); + } else { + this.expression.push(attr); + } + return this; + } + + /** + * Adds the array literal access for passed index + * + * @param index array index + */ + public atIndex(index: number): Expression { + if (!this.expression.length) { + throw new Error('Expression must start with an attribute'); + } + + this.expression.push(`[${index}]`); + return this; + } + + /** + * converts and return the string expression + */ + public toString(): string { + return this.expression.join(''); + } +} + +/** + * Properties for DynamoGetItem Task + */ +export interface DynamoGetItemProps { + /** + * A attribute representing the partition key of the item to retrieve. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html#DDB-GetItem-request-Key + */ + readonly partitionKey: Attribute; + + /** + * The name of the table containing the requested item. + */ + readonly tableName: string; + + /** + * A attribute representing the sort key of the item to retrieve. + * + * @default - No sort key + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html#DDB-GetItem-request-Key + */ + readonly sortKey?: Attribute; + + /** + * Determines the read consistency model: + * If set to true, then the operation uses strongly consistent reads; + * otherwise, the operation uses eventually consistent reads. + * + * @default false + */ + readonly consistentRead?: boolean; + + /** + * One or more substitution tokens for attribute names in an expression + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html#DDB-GetItem-request-ExpressionAttributeNames + * + * @default - No expression attributes + */ + readonly expressionAttributeNames?: { [key: string]: string }; + + /** + * An array of Expression that identifies one or more attributes to retrieve from the table. + * These attributes can include scalars, sets, or elements of a JSON document. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html#DDB-GetItem-request-ProjectionExpression + * + * @default - No projection expression + */ + readonly projectionExpression?: Expression[]; + + /** + * Determines the level of detail about provisioned throughput consumption that is returned in the response + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html#DDB-GetItem-request-ReturnConsumedCapacity + * + * @default - No returnConsumedCapacity + */ + readonly returnConsumedCapacity?: ReturnConsumedCapacity; +} + +/** + * Properties for DynamoPutItem Task + */ +export interface DynamoPutItemProps { + /** + * A map of attribute name/value pairs, one for each attribute. + * Only the primary key attributes are required; + * you can optionally provide other attribute name-value pairs for the item. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-Item + */ + readonly item: AttributeValueMap; + + /** + * The name of the table containing the requested item. + */ + readonly tableName: string; + + /** + * A condition that must be satisfied in order for a conditional PutItem operation to succeed. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-ConditionExpression + * + * @default - No condition expression + */ + readonly conditionExpression?: string; + + /** + * One or more substitution tokens for attribute names in an expression + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-ExpressionAttributeNames + * + * @default - No expression attribute names + */ + readonly expressionAttributeNames?: { [key: string]: string }; + + /** + * One or more values that can be substituted in an expression. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-ExpressionAttributeValues + * + * @default - No expression attribute values + */ + readonly expressionAttributeValues?: AttributeValueMap; + + /** + * Determines the level of detail about provisioned throughput consumption that is returned in the response + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-ReturnConsumedCapacity + * + * @default - No returnConsumedCapacity + */ + readonly returnConsumedCapacity?: ReturnConsumedCapacity; + + /** + * Determines whether item collection metrics are returned. + * If set to SIZE, the response includes statistics about item collections, if any, + * that were modified during the operation are returned in the response. + * If set to NONE (the default), no statistics are returned. + * + * @default - No returnItemCollectionMetrics + */ + readonly returnItemCollectionMetrics?: ReturnItemCollectionMetrics; + + /** + * Use ReturnValues if you want to get the item attributes as they appeared + * before they were updated with the PutItem request. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-ReturnValues + * + * @default - No returnValues + */ + readonly returnValues?: ReturnValues; +} + +/** + * Properties for DynamoDeleteItem Task + */ +export interface DynamoDeleteItemProps { + /** + * An attribute representing the partition key of the item to delete. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html#DDB-DeleteItem-request-Key + */ + readonly partitionKey: Attribute; + + /** + * The name of the table containing the requested item. + */ + readonly tableName: string; + + /** + * An attribute representing the sort key of the item to delete. + * + * @default - No sort key + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html#DDB-DeleteItem-request-Key + */ + readonly sortKey?: Attribute; + + /** + * A condition that must be satisfied in order for a conditional DeleteItem to succeed. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html#DDB-DeleteItem-request-ConditionExpression + * + * @default - No condition expression + */ + readonly conditionExpression?: string; + + /** + * One or more substitution tokens for attribute names in an expression + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html#DDB-DeleteItem-request-ExpressionAttributeNames + * + * @default - No expression attribute names + */ + readonly expressionAttributeNames?: { [key: string]: string }; + + /** + * One or more values that can be substituted in an expression. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html#DDB-DeleteItem-request-ExpressionAttributeValues + * + * @default - No expression attribute values + */ + readonly expressionAttributeValues?: AttributeValueMap; + + /** + * Determines the level of detail about provisioned throughput consumption that is returned in the response + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html#DDB-DeleteItem-request-ReturnConsumedCapacity + * + * @default - No returnConsumedCapacity + */ + readonly returnConsumedCapacity?: ReturnConsumedCapacity; + + /** + * Determines whether item collection metrics are returned. + * If set to SIZE, the response includes statistics about item collections, if any, + * that were modified during the operation are returned in the response. + * If set to NONE (the default), no statistics are returned. + * + * @default - No returnItemCollectionMetrics + */ + readonly returnItemCollectionMetrics?: ReturnItemCollectionMetrics; + + /** + * Use ReturnValues if you want to get the item attributes as they appeared before they were deleted. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html#DDB-DeleteItem-request-ReturnValues + * + * @default - No returnValues + */ + readonly returnValues?: ReturnValues; +} + +/** + * Properties for DynamoUpdateItem Task + */ +export interface DynamoUpdateItemProps { + /** + * The partition key of the item to be updated. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html#DDB-UpdateItem-request-Key + */ + readonly partitionKey: Attribute; + + /** + * The name of the table containing the requested item. + */ + readonly tableName: string; + + /** + * The sort key of the item to be updated. + * + * @default - No sort key + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html#DDB-UpdateItem-request-Key + */ + readonly sortKey?: Attribute; + + /** + * A condition that must be satisfied in order for a conditional DeleteItem to succeed. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html#DDB-UpdateItem-request-ConditionExpression + * + * @default - No condition expression + */ + readonly conditionExpression?: string; + + /** + * One or more substitution tokens for attribute names in an expression + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html#DDB-UpdateItem-request-ExpressionAttributeNames + * + * @default - No expression attribute names + */ + readonly expressionAttributeNames?: { [key: string]: string }; + + /** + * One or more values that can be substituted in an expression. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html#DDB-UpdateItem-request-ExpressionAttributeValues + * + * @default - No expression attribute values + */ + readonly expressionAttributeValues?: AttributeValueMap; + + /** + * Determines the level of detail about provisioned throughput consumption that is returned in the response + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html#DDB-UpdateItem-request-ReturnConsumedCapacity + * + * @default - No returnConsumedCapacity + */ + readonly returnConsumedCapacity?: ReturnConsumedCapacity; + + /** + * Determines whether item collection metrics are returned. + * If set to SIZE, the response includes statistics about item collections, if any, + * that were modified during the operation are returned in the response. + * If set to NONE (the default), no statistics are returned. + * + * @default - No returnItemCollectionMetrics + */ + readonly returnItemCollectionMetrics?: ReturnItemCollectionMetrics; + + /** + * Use ReturnValues if you want to get the item attributes as they appeared before they were deleted. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html#DDB-UpdateItem-request-ReturnValues + * + * @default - No returnValues + */ + readonly returnValues?: ReturnValues; + + /** + * An expression that defines one or more attributes to be updated, + * the action to be performed on them, and new values for them. + * + * @see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html#DDB-UpdateItem-request-UpdateExpression + * + * @default - No update expression + */ + readonly updateExpression?: string; +} + +/** + * A StepFunctions task to call DynamoGetItem + */ +export class DynamoGetItem implements sfn.IStepFunctionsTask { + constructor(private readonly props: DynamoGetItemProps) { + validateTableName(props.tableName); + } + + public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { + return { + resourceArn: getDynamoResourceArn(DynamoMethod.GET), + policyStatements: getDynamoPolicyStatements( + _task, + this.props.tableName, + DynamoMethod.GET + ), + parameters: { + Key: configurePrimaryKey(this.props.partitionKey, this.props.sortKey), + TableName: this.props.tableName, + ConsistentRead: this.props.consistentRead ?? false, + ExpressionAttributeNames: this.props.expressionAttributeNames, + ProjectionExpression: this.configureProjectionExpression( + this.props.projectionExpression + ), + ReturnConsumedCapacity: this.props.returnConsumedCapacity + } + }; + } + + private configureProjectionExpression( + expressions?: Expression[] + ): string | undefined { + return expressions + ? expressions.map(expression => expression.toString()).join(',') + : undefined; + } +} + +/** + * A StepFunctions task to call DynamoPutItem + */ +export class DynamoPutItem implements sfn.IStepFunctionsTask { + constructor(private readonly props: DynamoPutItemProps) { + validateTableName(props.tableName); + } + + public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { + return { + resourceArn: getDynamoResourceArn(DynamoMethod.PUT), + policyStatements: getDynamoPolicyStatements( + _task, + this.props.tableName, + DynamoMethod.PUT + ), + parameters: { + Item: transformAttributeValueMap(this.props.item), + TableName: this.props.tableName, + ConditionExpression: this.props.conditionExpression, + ExpressionAttributeNames: this.props.expressionAttributeNames, + ExpressionAttributeValues: transformAttributeValueMap( + this.props.expressionAttributeValues + ), + ReturnConsumedCapacity: this.props.returnConsumedCapacity, + ReturnItemCollectionMetrics: this.props.returnItemCollectionMetrics, + ReturnValues: this.props.returnValues + } + }; + } +} + +/** + * A StepFunctions task to call DynamoDeleteItem + */ +export class DynamoDeleteItem implements sfn.IStepFunctionsTask { + constructor(private readonly props: DynamoDeleteItemProps) { + validateTableName(props.tableName); + } + + public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { + return { + resourceArn: getDynamoResourceArn(DynamoMethod.DELETE), + policyStatements: getDynamoPolicyStatements( + _task, + this.props.tableName, + DynamoMethod.DELETE + ), + parameters: { + Key: configurePrimaryKey(this.props.partitionKey, this.props.sortKey), + TableName: this.props.tableName, + ConditionExpression: this.props.conditionExpression, + ExpressionAttributeNames: this.props.expressionAttributeNames, + ExpressionAttributeValues: transformAttributeValueMap( + this.props.expressionAttributeValues + ), + ReturnConsumedCapacity: this.props.returnConsumedCapacity, + ReturnItemCollectionMetrics: this.props.returnItemCollectionMetrics, + ReturnValues: this.props.returnValues + } + }; + } +} + +/** + * A StepFunctions task to call DynamoUpdateItem + */ +export class DynamoUpdateItem implements sfn.IStepFunctionsTask { + constructor(private readonly props: DynamoUpdateItemProps) { + validateTableName(props.tableName); + } + + public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { + return { + resourceArn: getDynamoResourceArn(DynamoMethod.UPDATE), + policyStatements: getDynamoPolicyStatements( + _task, + this.props.tableName, + DynamoMethod.UPDATE + ), + parameters: { + Key: configurePrimaryKey(this.props.partitionKey, this.props.sortKey), + TableName: this.props.tableName, + ConditionExpression: this.props.conditionExpression, + ExpressionAttributeNames: this.props.expressionAttributeNames, + ExpressionAttributeValues: transformAttributeValueMap( + this.props.expressionAttributeValues + ), + ReturnConsumedCapacity: this.props.returnConsumedCapacity, + ReturnItemCollectionMetrics: this.props.returnItemCollectionMetrics, + ReturnValues: this.props.returnValues, + UpdateExpression: this.props.updateExpression + } + }; + } +} + +/** + * A helper wrapper class to call all DynamoDB APIs + */ +export class CallDynamoDB { + /** + * Method to get DynamoGetItem task + * + * @param props DynamoGetItemProps + */ + public static getItem(props: DynamoGetItemProps) { + return new DynamoGetItem(props); + } + + /** + * Method to get DynamoPutItem task + * + * @param props DynamoPutItemProps + */ + public static putItem(props: DynamoPutItemProps) { + return new DynamoPutItem(props); + } + + /** + * Method to get DynamoDeleteItem task + * + * @param props DynamoDeleteItemProps + */ + public static deleteItem(props: DynamoDeleteItemProps) { + return new DynamoDeleteItem(props); + } + + /** + * Method to get DynamoUpdateItem task + * + * @param props DynamoUpdateItemProps + */ + public static updateItem(props: DynamoUpdateItemProps) { + return new DynamoUpdateItem(props); + } +} + +enum DynamoMethod { + GET = 'Get', + PUT = 'Put', + DELETE = 'Delete', + UPDATE = 'Update' +} + +function validateTableName(tableName: string) { + if ( + tableName.length < 3 || + tableName.length > 255 || + !new RegExp(/[a-zA-Z0-9_.-]+$/).test(tableName) + ) { + throw new Error( + 'TableName should not contain alphanumeric characters and should be between 3-255 characters long.' + ); + } +} + +function getDynamoResourceArn(method: DynamoMethod) { + return getResourceArn( + 'dynamodb', + `${method.toLowerCase()}Item`, + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET + ); +} + +function getDynamoPolicyStatements( + task: sfn.Task, + tableName: string, + method: DynamoMethod +) { + return [ + new iam.PolicyStatement({ + resources: [ + Stack.of(task).formatArn({ + service: 'dynamodb', + resource: 'table', + resourceName: tableName + }) + ], + actions: [`dynamodb:${method}Item`] + }) + ]; +} + +function configurePrimaryKey(partitionKey: Attribute, sortKey?: Attribute) { + const key = { + [partitionKey.name]: partitionKey.value.toObject() + }; + + if (sortKey) { + key[sortKey.name] = sortKey.value.toObject(); + } + + return key; +} + +function transformAttributeValueMap(attrMap?: AttributeValueMap) { + const transformedValue: any = {}; + for (const key in attrMap) { + if (key) { + transformedValue[key] = attrMap[key].toObject(); + } + } + return attrMap ? transformedValue : undefined; +} diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts index 2057acaf632e8..7ee9c418a1b89 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts @@ -21,3 +21,4 @@ export * from './emr-modify-instance-fleet-by-name'; export * from './emr-modify-instance-group-by-name'; export * from './run-glue-job-task'; export * from './run-batch-job'; +export * from './call-dynamodb'; diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/call-dynamodb.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/call-dynamodb.test.ts new file mode 100644 index 0000000000000..6076b6a50ffba --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/call-dynamodb.test.ts @@ -0,0 +1,261 @@ +import * as sfn from '@aws-cdk/aws-stepfunctions'; +import * as cdk from '@aws-cdk/core'; +import * as tasks from '../lib'; + +let stack: cdk.Stack; +const TABLE_NAME = 'SOME_TABLE'; + +beforeEach(() => { + // GIVEN + stack = new cdk.Stack(); +}); + +test('GetItem task', () => { + // WHEN + const task = new sfn.Task(stack, 'GetItem', { + task: tasks.CallDynamoDB.getItem({ + partitionKey: { + name: 'SOME_KEY', + value: new tasks.AttributeValue().addS('1234') + }, + sortKey: { + name: 'OTHER_KEY', + value: new tasks.AttributeValue().addN('4321') + }, + tableName: TABLE_NAME, + consistentRead: true, + expressionAttributeNames: { OTHER_KEY: '#OK' }, + projectionExpression: [ + new tasks.Expression() + .withAttribute('Messages') + .atIndex(1) + .withAttribute('Tags'), + new tasks.Expression().withAttribute('ID') + ], + returnConsumedCapacity: tasks.ReturnConsumedCapacity.TOTAL + }) + }); + + // THEN + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition' + }, + ':states:::dynamodb:getItem' + ] + ] + }, + End: true, + Parameters: { + Key: { SOME_KEY: { S: '1234' }, OTHER_KEY: { N: '4321' } }, + TableName: TABLE_NAME, + ConsistentRead: true, + ExpressionAttributeNames: { OTHER_KEY: '#OK' }, + ProjectionExpression: 'Messages[1].Tags,ID', + ReturnConsumedCapacity: 'TOTAL' + } + }); +}); + +test('PutItem task', () => { + // WHEN + const task = new sfn.Task(stack, 'PutItem', { + task: tasks.CallDynamoDB.putItem({ + item: { SOME_KEY: new tasks.AttributeValue().addS('1234') }, + tableName: TABLE_NAME, + conditionExpression: 'ForumName <> :f and Subject <> :s', + expressionAttributeNames: { OTHER_KEY: '#OK' }, + expressionAttributeValues: { + ':val': new tasks.AttributeValue().addN( + sfn.Data.stringAt('$.Item.TotalCount.N') + ) + }, + returnConsumedCapacity: tasks.ReturnConsumedCapacity.TOTAL, + returnItemCollectionMetrics: tasks.ReturnItemCollectionMetrics.SIZE, + returnValues: tasks.ReturnValues.ALL_NEW + }) + }); + + // THEN + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition' + }, + ':states:::dynamodb:putItem' + ] + ] + }, + End: true, + Parameters: { + Item: { SOME_KEY: { S: '1234' } }, + TableName: TABLE_NAME, + ConditionExpression: 'ForumName <> :f and Subject <> :s', + ExpressionAttributeNames: { OTHER_KEY: '#OK' }, + ExpressionAttributeValues: { ':val': { 'N.$': '$.Item.TotalCount.N' } }, + ReturnConsumedCapacity: 'TOTAL', + ReturnItemCollectionMetrics: 'SIZE', + ReturnValues: 'ALL_NEW' + } + }); +}); + +test('DeleteItem task', () => { + // WHEN + const task = new sfn.Task(stack, 'DeleteItem', { + task: tasks.CallDynamoDB.deleteItem({ + partitionKey: { + name: 'SOME_KEY', + value: new tasks.AttributeValue().addS('1234') + }, + tableName: TABLE_NAME, + conditionExpression: 'ForumName <> :f and Subject <> :s', + expressionAttributeNames: { OTHER_KEY: '#OK' }, + expressionAttributeValues: { + ':val': new tasks.AttributeValue().addN( + sfn.Data.stringAt('$.Item.TotalCount.N') + ) + }, + returnConsumedCapacity: tasks.ReturnConsumedCapacity.TOTAL, + returnItemCollectionMetrics: tasks.ReturnItemCollectionMetrics.SIZE, + returnValues: tasks.ReturnValues.ALL_NEW + }) + }); + + // THEN + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition' + }, + ':states:::dynamodb:deleteItem' + ] + ] + }, + End: true, + Parameters: { + Key: { SOME_KEY: { S: '1234' } }, + TableName: TABLE_NAME, + ConditionExpression: 'ForumName <> :f and Subject <> :s', + ExpressionAttributeNames: { OTHER_KEY: '#OK' }, + ExpressionAttributeValues: { ':val': { 'N.$': '$.Item.TotalCount.N' } }, + ReturnConsumedCapacity: 'TOTAL', + ReturnItemCollectionMetrics: 'SIZE', + ReturnValues: 'ALL_NEW' + } + }); +}); + +test('UpdateItem task', () => { + // WHEN + const task = new sfn.Task(stack, 'UpdateItem', { + task: tasks.CallDynamoDB.updateItem({ + partitionKey: { + name: 'SOME_KEY', + value: new tasks.AttributeValue().addS('1234') + }, + tableName: TABLE_NAME, + conditionExpression: 'ForumName <> :f and Subject <> :s', + expressionAttributeNames: { OTHER_KEY: '#OK' }, + expressionAttributeValues: { + ':val': new tasks.AttributeValue().addN( + sfn.Data.stringAt('$.Item.TotalCount.N') + ) + }, + returnConsumedCapacity: tasks.ReturnConsumedCapacity.TOTAL, + returnItemCollectionMetrics: tasks.ReturnItemCollectionMetrics.SIZE, + returnValues: tasks.ReturnValues.ALL_NEW, + updateExpression: 'SET TotalCount = TotalCount + :val' + }) + }); + + // THEN + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition' + }, + ':states:::dynamodb:updateItem' + ] + ] + }, + End: true, + Parameters: { + Key: { SOME_KEY: { S: '1234' } }, + TableName: TABLE_NAME, + ConditionExpression: 'ForumName <> :f and Subject <> :s', + ExpressionAttributeNames: { OTHER_KEY: '#OK' }, + ExpressionAttributeValues: { ':val': { 'N.$': '$.Item.TotalCount.N' } }, + ReturnConsumedCapacity: 'TOTAL', + ReturnItemCollectionMetrics: 'SIZE', + ReturnValues: 'ALL_NEW', + UpdateExpression: 'SET TotalCount = TotalCount + :val' + } + }); +}); + +test('Invalid value of TableName should throw', () => { + expect(() => { + new sfn.Task(stack, 'GetItem', { + task: tasks.CallDynamoDB.getItem({ + partitionKey: { + name: 'SOME_KEY', + value: new tasks.AttributeValue().addS('1234') + }, + tableName: 'ab' + }) + }); + }).toThrow( + /TableName should not contain alphanumeric characters and should be between 3-255 characters long./ + ); + + expect(() => { + new sfn.Task(stack, 'GetItem', { + task: tasks.CallDynamoDB.getItem({ + partitionKey: { + name: 'SOME_KEY', + value: new tasks.AttributeValue().addS('1234') + }, + tableName: + 'abU93s5MTZDv6TYLk3Q3BE3Hj3AMca3NOb5ypSNZv1JZIONg7p8L8LNxuAStavPxYZKcoG36KwXktkuFHf0jJvt7SKofEqwYHmmK0tNJSkGoPe3MofnB7IWu3V48HbrqNGZqW005CMmDHESQWf40JK8qK0CSQtM8Z64zqysB7SZZazDRm7kKr062RXQKL82nvTxnKxTPfCHiG2YJEhuFdUywHCTN2Rjinl3P7TpwyIuPWyYHm6nZodRKLMmWpgUftZ' + }) + }); + }).toThrow( + /TableName should not contain alphanumeric characters and should be between 3-255 characters long./ + ); + + expect(() => { + new sfn.Task(stack, 'GetItem', { + task: tasks.CallDynamoDB.getItem({ + partitionKey: { + name: 'SOME_KEY', + value: new tasks.AttributeValue().addS('1234') + }, + tableName: 'abcd@' + }) + }); + }).toThrow( + /TableName should not contain alphanumeric characters and should be between 3-255 characters long./ + ); +}); diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.call-dynamodb.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.call-dynamodb.expected.json new file mode 100644 index 0000000000000..5bc3b566a2d42 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.call-dynamodb.expected.json @@ -0,0 +1,194 @@ +{ + "Resources": { + "StateMachineRoleB840431D": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "states.", + { + "Ref": "AWS::Region" + }, + ".amazonaws.com" + ] + ] + } + } + } + ], + "Version": "2012-10-17" + } + } + }, + "StateMachineRoleDefaultPolicyDF1E6607": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "dynamodb:PutItem", + "Effect": "Allow", + "Resource": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":dynamodb:", + { + "Ref": "AWS::Region" + }, + ":", + { + "Ref": "AWS::AccountId" + }, + ":table/Messages" + ] + ] + } + }, + { + "Action": "dynamodb:GetItem", + "Effect": "Allow", + "Resource": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":dynamodb:", + { + "Ref": "AWS::Region" + }, + ":", + { + "Ref": "AWS::AccountId" + }, + ":table/Messages" + ] + ] + } + }, + { + "Action": "dynamodb:UpdateItem", + "Effect": "Allow", + "Resource": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":dynamodb:", + { + "Ref": "AWS::Region" + }, + ":", + { + "Ref": "AWS::AccountId" + }, + ":table/Messages" + ] + ] + } + }, + { + "Action": "dynamodb:DeleteItem", + "Effect": "Allow", + "Resource": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":dynamodb:", + { + "Ref": "AWS::Region" + }, + ":", + { + "Ref": "AWS::AccountId" + }, + ":table/Messages" + ] + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "StateMachineRoleDefaultPolicyDF1E6607", + "Roles": [ + { + "Ref": "StateMachineRoleB840431D" + } + ] + } + }, + "StateMachine2E01A3A5": { + "Type": "AWS::StepFunctions::StateMachine", + "Properties": { + "DefinitionString": { + "Fn::Join": [ + "", + [ + "{\"StartAt\":\"Start\",\"States\":{\"Start\":{\"Type\":\"Pass\",\"Result\":{\"bar\":\"SomeValue\"},\"Next\":\"PutItem\"},\"PutItem\":{\"Next\":\"GetItemAfterPut\",\"Parameters\":{\"Item\":{\"MessageId\":{\"S\":\"1234\"},\"Text\":{\"S.$\":\"$.bar\"},\"TotalCount\":{\"N\":\"18\"}},\"TableName\":\"Messages\"},\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::dynamodb:putItem\"},\"GetItemAfterPut\":{\"Next\":\"UpdateItem\",\"Parameters\":{\"Key\":{\"MessageId\":{\"S\":\"1234\"}},\"TableName\":\"Messages\",\"ConsistentRead\":false},\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::dynamodb:getItem\"},\"UpdateItem\":{\"Next\":\"GetItemAfterUpdate\",\"Parameters\":{\"Key\":{\"MessageId\":{\"S\":\"1234\"}},\"TableName\":\"Messages\",\"ExpressionAttributeValues\":{\":val\":{\"N.$\":\"$.Item.TotalCount.N\"},\":rand\":{\"N\":\"24\"}},\"UpdateExpression\":\"SET TotalCount = :val + :rand\"},\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::dynamodb:updateItem\"},\"GetItemAfterUpdate\":{\"Next\":\"DeleteItem\",\"Parameters\":{\"Key\":{\"MessageId\":{\"S\":\"1234\"}},\"TableName\":\"Messages\",\"ConsistentRead\":false},\"OutputPath\":\"$.Item.TotalCount.N\",\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::dynamodb:getItem\"},\"DeleteItem\":{\"End\":true,\"Parameters\":{\"Key\":{\"MessageId\":{\"S\":\"1234\"}},\"TableName\":\"Messages\"},\"Type\":\"Task\",\"Resource\":\"arn:", + { + "Ref": "AWS::Partition" + }, + ":states:::dynamodb:deleteItem\",\"ResultPath\":null}}}" + ] + ] + }, + "RoleArn": { + "Fn::GetAtt": [ + "StateMachineRoleB840431D", + "Arn" + ] + } + }, + "DependsOn": [ + "StateMachineRoleDefaultPolicyDF1E6607", + "StateMachineRoleB840431D" + ] + } + }, + "Outputs": { + "StateMachineArn": { + "Value": { + "Ref": "StateMachine2E01A3A5" + } + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.call-dynamodb.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.call-dynamodb.ts new file mode 100644 index 0000000000000..3ec36008f2fe3 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.call-dynamodb.ts @@ -0,0 +1,115 @@ +import * as sfn from '@aws-cdk/aws-stepfunctions'; +import * as cdk from '@aws-cdk/core'; +import * as tasks from '../lib'; + +/** + * Pre verification steps: + * * aws dynamodb create-table --table-name Messages --key-schema AttributeName=MessageId,KeyType=HASH \ + * * --attribute-definitions AttributeName=MessageId,AttributeType=S \ + * * --provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=5 + */ + +/* + * Stack verification steps: + * * aws stepfunctions start-execution --state-machine-arn : should return execution arn + * * + * * aws stepfunctions describe-execution --execution-arn --query 'status': should return status as SUCCEEDED + * * aws stepfunctions describe-execution --execution-arn --query 'output': should return the number 42 + */ + +/** + * Post verification steps: + * * aws dynamodb delete-table --table-name Messages + */ + +class CallDynamoDBStack extends cdk.Stack { + constructor(scope: cdk.App, id: string, props: cdk.StackProps = {}) { + super(scope, id, props); + + const TABLE_NAME = 'Messages'; + const MESSAGE_ID = `1234`; + const firstNumber = 18; + const secondNumber = 24; + + const putItemTask = new sfn.Task(this, 'PutItem', { + task: tasks.CallDynamoDB.putItem({ + item: { + MessageId: new tasks.AttributeValue().addS(MESSAGE_ID), + Text: new tasks.AttributeValue().addS(sfn.Data.stringAt('$.bar')), + TotalCount: new tasks.AttributeValue().addN(`${firstNumber}`) + }, + tableName: TABLE_NAME + }) + }); + + const getItemTaskAfterPut = new sfn.Task(this, 'GetItemAfterPut', { + task: tasks.CallDynamoDB.getItem({ + partitionKey: { + name: 'MessageId', + value: new tasks.AttributeValue().addS(MESSAGE_ID) + }, + tableName: TABLE_NAME + }) + }); + + const updateItemTask = new sfn.Task(this, 'UpdateItem', { + task: tasks.CallDynamoDB.updateItem({ + partitionKey: { + name: 'MessageId', + value: new tasks.AttributeValue().addS(MESSAGE_ID) + }, + tableName: TABLE_NAME, + expressionAttributeValues: { + ':val': new tasks.AttributeValue().addN( + sfn.Data.stringAt('$.Item.TotalCount.N') + ), + ':rand': new tasks.AttributeValue().addN(`${secondNumber}`) + }, + updateExpression: 'SET TotalCount = :val + :rand' + }) + }); + + const getItemTaskAfterUpdate = new sfn.Task(this, 'GetItemAfterUpdate', { + task: tasks.CallDynamoDB.getItem({ + partitionKey: { + name: 'MessageId', + value: new tasks.AttributeValue().addS(MESSAGE_ID) + }, + tableName: TABLE_NAME + }), + outputPath: sfn.Data.stringAt('$.Item.TotalCount.N') + }); + + const deleteItemTask = new sfn.Task(this, 'DeleteItem', { + task: tasks.CallDynamoDB.deleteItem({ + partitionKey: { + name: 'MessageId', + value: new tasks.AttributeValue().addS(MESSAGE_ID) + }, + tableName: TABLE_NAME + }), + resultPath: 'DISCARD' + }); + + const definition = new sfn.Pass(this, 'Start', { + result: sfn.Result.fromObject({ bar: 'SomeValue' }) + }) + .next(putItemTask) + .next(getItemTaskAfterPut) + .next(updateItemTask) + .next(getItemTaskAfterUpdate) + .next(deleteItemTask); + + const stateMachine = new sfn.StateMachine(this, 'StateMachine', { + definition + }); + + new cdk.CfnOutput(this, 'StateMachineArn', { + value: stateMachine.stateMachineArn + }); + } +} + +const app = new cdk.App(); +new CallDynamoDBStack(app, 'aws-stepfunctions-integ'); +app.synth(); diff --git a/packages/@aws-cdk/aws-stepfunctions/README.md b/packages/@aws-cdk/aws-stepfunctions/README.md index 70defad102f9b..dad899f2f904a 100644 --- a/packages/@aws-cdk/aws-stepfunctions/README.md +++ b/packages/@aws-cdk/aws-stepfunctions/README.md @@ -139,6 +139,7 @@ couple of the tasks available are: * `tasks.SagemakerTransformTask` -- run a SageMaker transform job * `tasks.StartExecution` -- call StartExecution to a state machine of Step Functions * `tasks.EvaluateExpression` -- evaluate an expression referencing state paths +* `tasks.CallDynamoDB` -- call GetItem, PutItem, DeleteItem and UpdateItem APIs of DynamoDB Except `tasks.InvokeActivity` and `tasks.InvokeFunction`, the [service integration pattern](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html) @@ -426,6 +427,87 @@ The `EvaluateExpression` supports a `runtime` prop to specify the Lambda runtime to use to evaluate the expression. Currently, the only runtime supported is `lambda.Runtime.NODEJS_10_X`. +#### DynamoDB example + +```ts +const TABLE_NAME = 'Messages'; +const MESSAGE_ID = `1234`; +const firstNumber = 18; +const secondNumber = 24; + +const putItemTask = new sfn.Task(this, 'PutItem', { + task: tasks.CallDynamoDB.putItem({ + item: { + MessageId: new tasks.AttributeValue().addS(MESSAGE_ID), + Text: new tasks.AttributeValue().addS(sfn.Data.stringAt('$.bar')), + TotalCount: new tasks.AttributeValue().addN(`${firstNumber}`) + }, + tableName: TABLE_NAME + }) +}); + +const getItemTaskAfterPut = new sfn.Task(this, 'GetItemAfterPut', { + task: tasks.CallDynamoDB.getItem({ + partitionKey: { + name: 'MessageId', + value: new tasks.AttributeValue().addS(MESSAGE_ID) + }, + tableName: TABLE_NAME + }) +}); + +const updateItemTask = new sfn.Task(this, 'UpdateItem', { + task: tasks.CallDynamoDB.updateItem({ + partitionKey: { + name: 'MessageId', + value: new tasks.AttributeValue().addS(MESSAGE_ID) + }, + tableName: TABLE_NAME, + expressionAttributeValues: { + ':val': new tasks.AttributeValue().addN( + sfn.Data.stringAt('$.Item.TotalCount.N') + ), + ':rand': new tasks.AttributeValue().addN(`${secondNumber}`) + }, + updateExpression: 'SET TotalCount = :val + :rand' + }) +}); + +const getItemTaskAfterUpdate = new sfn.Task(this, 'GetItemAfterUpdate', { + task: tasks.CallDynamoDB.getItem({ + partitionKey: { + name: 'MessageId', + value: new tasks.AttributeValue().addS(MESSAGE_ID) + }, + tableName: TABLE_NAME + }), + outputPath: sfn.Data.stringAt('$.Item.TotalCount.N') +}); + +const deleteItemTask = new sfn.Task(this, 'DeleteItem', { + task: tasks.CallDynamoDB.deleteItem({ + partitionKey: { + name: 'MessageId', + value: new tasks.AttributeValue().addS(MESSAGE_ID) + }, + tableName: TABLE_NAME + }), + resultPath: 'DISCARD' +}); + +const definition = new sfn.Pass(this, 'Start', { + result: sfn.Result.fromObject({ bar: 'SomeValue' }) +}) + .next(putItemTask) + .next(getItemTaskAfterPut) + .next(updateItemTask) + .next(getItemTaskAfterUpdate) + .next(deleteItemTask); + +const stateMachine = new sfn.StateMachine(this, 'StateMachine', { + definition +}); +``` ### Pass