Skip to content

Commit

Permalink
feat(stepfunctions-tasks): Support for Athena APIs: StartQueryExecuti…
Browse files Browse the repository at this point in the history
…on, StopQueryExeuction, GetQueryResults and GetQueryExecution (#11045)

feat(stepfunctions-tasks): support for Athena APIs: StartQueryExecution, StopQueryExeuction, GetQueryResults and GetQueryExecution 

**Implementation**

Update package `@aws-cdk/aws-stepfunctions-tasks` to include support for Athena **StartQueryExecution**, **StopQueryExeuction**, **GetQueryResults**, **GetQueryExecution**    API as per documentation here: 
https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html

Includes support for the following Amazon Athena API calls:
* `StartQueryExecution`
* `StopQueryExeuction`
* `GetQueryResults`
* `GetQueryExecution`

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
Sumeet-Badyal authored Oct 26, 2020
1 parent 75875cc commit 19180cc
Show file tree
Hide file tree
Showing 18 changed files with 1,977 additions and 0 deletions.
71 changes: 71 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aw
- [ResultPath](#resultpath)
- [Parameters](#task-parameters-from-the-state-json)
- [Evaluate Expression](#evaluate-expression)
- [Athena](#athena)
- [StartQueryExecution](#startQueryExecution)
- [GetQueryExecution](#getQueryExecution)
- [GetQueryResults](#getQueryResults)
- [StopQueryExecution](#stopQueryExecution)
- [Batch](#batch)
- [SubmitJob](#submitjob)
- [CodeBuild](#codebuild)
Expand Down Expand Up @@ -205,6 +210,72 @@ The `EvaluateExpression` supports a `runtime` prop to specify the Lambda
runtime to use to evaluate the expression. Currently, the only runtime
supported is `lambda.Runtime.NODEJS_10_X`.


## Athena

Step Functions supports [Athena](https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html) through the service integration pattern.

### StartQueryExecution

The [StartQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_StartQueryExecution.html) API runs the SQL query statement.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from `@aws-cdk/aws-stepfunctions-tasks`;

const startQueryExecutionJob = new tasks.AthenaStartQueryExecution(stack, 'Start Athena Query', {
queryString: sfn.JsonPath.stringAt('$.queryString'),
queryExecutionContext: {
database: 'mydatabase',
},
resultConfiguration: {
encryptionConfiguration: {
encryptionOption: tasks.EncryptionOption.S3_MANAGED,
},
outputLocation: sfn.JsonPath.stringAt('$.outputLocation'),
},
});
```

### GetQueryExecution

The [GetQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryExecution.html) API gets information about a single execution of a query.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from `@aws-cdk/aws-stepfunctions-tasks`;

const getQueryExecutionJob = new tasks.AthenaGetQueryExecution(stack, 'Get Query Execution', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
});
```

### GetQueryResults

The [GetQueryResults](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) API that streams the results of a single query execution specified by QueryExecutionId from S3.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from `@aws-cdk/aws-stepfunctions-tasks`;

const getQueryResultsJob = new tasks.AthenaGetQueryResults(stack, 'Get Query Results', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
});
```

### StopQueryExecution

The [StopQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_StopQueryExecution.html) API that stops a query execution.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from `@aws-cdk/aws-stepfunctions-tasks`;

const stopQueryExecutionJob = new tasks.AthenaStopQueryExecution(stack, 'Stop Query Execution', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
});
```

## Batch

Step Functions supports [Batch](https://docs.aws.amazon.com/step-functions/latest/dg/connect-batch.html) through the service integration pattern.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for getting a Query Execution
* @experimental
*/
export interface AthenaGetQueryExecutionProps extends sfn.TaskStateBaseProps {
/**
* Query that will be retrieved
*
* @example 'adfsaf-23trf23-f23rt23'
*/
readonly queryExecutionId: string;
}

/**
* Get an Athena Query Execution as a Task
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html
* @experimental
*/
export class AthenaGetQueryExecution extends sfn.TaskStateBase {

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: AthenaGetQueryExecutionProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, AthenaGetQueryExecution.SUPPORTED_INTEGRATION_PATTERNS);

this.taskPolicies = [
new iam.PolicyStatement({
resources: ['*'], // Grant access to all workgroups as it can not be specified in the request https://docs.aws.amazon.com/athena/latest/ug/workgroups-iam-policy.html
actions: ['athena:getQueryExecution'],
}),
];
}

/**
* Provides the Athena get query execution service integration task configuration
* @internal
*/
protected _renderTask(): any {
return {
Resource: integrationResourceArn('athena', 'getQueryExecution', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
QueryExecutionId: this.props.queryExecutionId,
}),
};
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for getting a Query Results
* @experimental
*/
export interface AthenaGetQueryResultsProps extends sfn.TaskStateBaseProps {
/**
* Query that will be retrieved
*
* @example 'adfsaf-23trf23-f23rt23'
*/
readonly queryExecutionId: string;

/**
* Pagination token
*
* @default - No next token
*/
readonly nextToken?: string;

/**
* Max number of results
*
* @default 1000
*/
readonly maxResults?: number;
}

/**
* Get an Athena Query Results as a Task
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html
* @experimental
*/
export class AthenaGetQueryResults extends sfn.TaskStateBase {

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: AthenaGetQueryResultsProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, AthenaGetQueryResults.SUPPORTED_INTEGRATION_PATTERNS);

const policyStatements = [
new iam.PolicyStatement({
resources: ['*'], // Workgroup can not be specified in the request https://docs.aws.amazon.com/athena/latest/ug/workgroups-iam-policy.html
actions: ['athena:getQueryResults'],
}),
];

policyStatements.push(
new iam.PolicyStatement({
actions: ['s3:GetObject'],
resources: ['*'], // To stream query results successfully the IAM principal must have permissions to the Amazon S3 GetObject action for the Athena query results location https://docs.amazonaws.cn/en_us/athena/latest/APIReference/API_GetQueryResults.html
}),
);

this.taskPolicies = policyStatements;
}

/**
* Provides the Athena get query results service integration task configuration
* @internal
*/
protected _renderTask(): any {
return {
Resource: integrationResourceArn('athena', 'getQueryResults', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
QueryExecutionId: this.props.queryExecutionId,
NextToken: this.props.nextToken,
MaxResults: this.props.maxResults,
}),
};
}
}

Loading

0 comments on commit 19180cc

Please sign in to comment.