Skip to content

Commit

Permalink
incorporate review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
knihit committed Feb 5, 2021
1 parent 75a869f commit cfd9836
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,41 @@ This AWS Solutions Construct deploys a Kinesis Stream and configures a AWS Glue
Here is a minimal deployable pattern definition in Typescript:

```javascript
const fieldSchema: CfnTable.ColumnProperty [] = [{
name: 'id',
type: 'int',
comment: 'Identifier for the record',
},
{
name: 'name',
type: 'string',
comment: 'Name for the record',
},
{
name: 'address',
type: 'string',
comment: 'Address for the record',
},
{
name: 'value',
type: 'int',
comment: 'Value for the record',
},
]
);

const _customEtlJob = new KinesisStreamGlueJob(this, 'CustomETL', {
const fieldSchema: glue.CfnTable.ColumnProperty[] = [
{
name: 'id',
type: 'int',
comment: 'Identifier for the record',
},
{
name: 'name',
type: 'string',
comment: 'Name for the record',
},
{
name: 'address',
type: 'string',
comment: 'Address for the record',
},
{
name: 'value',
type: 'int',
comment: 'Value for the record',
},
];

const customEtlJob = new KinesisstreamsToGluejob(this, 'CustomETL', {
glueJobProps: {
command: {
name: 'gluestreaming',
pythonVersion: '3',
scriptLocation: new Asset(this, 'ScriptLocation', {
path: `${__dirname}/../etl/transform.py`
}).s3ObjectUrl
}
},
fieldSchema: fieldSchema
}
name: 'gluestreaming',
pythonVersion: '3',
scriptLocation: new s3assets.Asset(this, 'ScriptLocation', {
path: `${__dirname}/../etl/transform.py`,
}).s3ObjectUrl,
}
},
fieldSchema: fieldSchema,
});

```

## Initializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
* and limitations under the License.
*/

import { CfnDatabase, CfnDatabaseProps, CfnJob, CfnJobProps, CfnTable, CfnTableProps } from '@aws-cdk/aws-glue';
import * as glue from '@aws-cdk/aws-glue';
import { CfnPolicy, Effect, IRole, Policy, PolicyStatement } from '@aws-cdk/aws-iam';
import { Stream, StreamProps } from '@aws-cdk/aws-kinesis';
import { Bucket } from '@aws-cdk/aws-s3';
import { Aws, Construct } from '@aws-cdk/core';
import * as defaults from '@aws-solutions-constructs/core';

export interface KinesisStreamGlueJobProps {
export interface KinesisstreamsToGluejobProps {
/**
* Existing instance of Kineses Data Stream. If not set, it will create an instance
*/
Expand All @@ -37,13 +37,13 @@ export interface KinesisStreamGlueJobProps {
* for CfnJobProps. If a role is not passed, the construct creates one for you and attaches the appropriate
* role policies
*
* @default
* @default - None
*/
readonly glueJobProps?: CfnJobProps | any;
readonly glueJobProps?: glue.CfnJobProps | any;
/**
* Existing GlueJob configuration. If this property is provided, any propertiers provided through @glueJobProps is ignored
*/
readonly existingGlueJob?: CfnJob;
readonly existingGlueJob?: glue.CfnJob;
/**
* Structure of the records in the Amazon Kinesis Data Streams. An example of such a definition is as below.
* Either @table or @fieldSchema is mandatory. If @table is provided then @fieldSchema is ignored
Expand All @@ -65,56 +65,67 @@ export interface KinesisStreamGlueJobProps {
* "comment": "Some value associated with the record"
* },
*
* @default
* @default - None
*/
readonly fieldSchema?: CfnTable.ColumnProperty [];
readonly fieldSchema?: glue.CfnTable.ColumnProperty [];
/**
* Glue Table for this construct, If not provided the construct will create a new Table in the
* database. This table should define the schema for the records in the Kinesis Data Streams.
* One of @tableprops or @table or @fieldSchema is mandatory. If @tableprops is provided then
* @table and @fieldSchema are ignored. If @table is provided, @fieldSchema is ignored
*/
readonly existingTable?: CfnTable;
readonly existingTable?: glue.CfnTable;
/**
* The table properties for the construct to create the table. One of @tableprops or @table
* or @fieldSchema is mandatory. If @tableprops is provided then @table and @fieldSchema
* are ignored. If @table is provided, @fieldSchema is ignored
*/
readonly tableProps?: CfnTableProps;
readonly tableProps?: glue.CfnTableProps;
/**
* Glue Database for this construct. If not provided the construct will create a new Glue Database.
* The database is where the schema for the data in Kinesis Data Streams is stored
*/
readonly existingDatabase?: CfnDatabase;
readonly existingDatabase?: glue.CfnDatabase;
/**
* The props for the Glue database that the construct should use to create. If @database is set
* then this property is ignored. If none of @database and @databaseprops is provided, the
* construct will define a GlueDatabase resoruce.
*/
readonly databaseProps?: CfnDatabaseProps;
readonly databaseProps?: glue.CfnDatabaseProps;
/**
* The output data stores where the transformed data should be written. Current supported data stores
* include only S3, other potential stores may be added in the future.
*/
readonly outputDataStore?: defaults.SinkDataStoreProps;
}

export class KinesisStreamGlueJob extends Construct {
/**
* @summary = This construct either creates or uses the existing construct provided that can be deployed
* to perform streaming ETL operations using:
* - AWS Glue Database
* - AWS Glue Table
* - AWS Glue Job
* - Amazon Kinesis Data Streams
* - Amazon S3 Bucket (output datastore).
* The construct also configures the required role policies so that AWS Glue Job can read data from
* the streams, process it, and write to an output store.
*/
export class KinesisstreamsToGluejob extends Construct {
public readonly kinesisStream: Stream;
public readonly glueJob: CfnJob;
public readonly glueJob: glue.CfnJob;
public readonly glueJobRole: IRole;
public readonly database: CfnDatabase;
public readonly table: CfnTable;
public readonly database: glue.CfnDatabase;
public readonly table: glue.CfnTable;
public readonly outputBucket?: [Bucket, (Bucket | undefined)?];

/**
* Constructs a new instalce of KinesisStreamGlueJob
* Constructs a new instance of KinesisstreamsToGluejob.Based on the values set in the @props
*
* @param scope
* @param id
* @param props
*/
constructor(scope: Construct, id: string, props: KinesisStreamGlueJobProps) {
constructor(scope: Construct, id: string, props: KinesisstreamsToGluejobProps) {
super(scope, id);

this.kinesisStream = defaults.buildKinesisStream(this, {
Expand Down Expand Up @@ -156,7 +167,7 @@ export class KinesisStreamGlueJob extends Construct {
* @param glueJob
* @param role
*/
private buildRolePolicy(scope: Construct, glueDatabase: CfnDatabase, glueTable: CfnTable, glueJob: CfnJob, role: IRole): IRole {
private buildRolePolicy(scope: Construct, glueDatabase: glue.CfnDatabase, glueTable: glue.CfnTable, glueJob: glue.CfnJob, role: IRole): IRole {
const _glueJobPolicy = new Policy(scope, 'GlueJobPolicy', {
statements: [ new PolicyStatement({
effect: Effect.ALLOW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { Role, ServicePrincipal } from '@aws-cdk/aws-iam';
import { Bucket, CfnBucket } from '@aws-cdk/aws-s3';
import { App, Duration, Stack } from '@aws-cdk/core';
import { SinkStoreType } from '@aws-solutions-constructs/core';
import { KinesisStreamGlueJob } from '../lib';
import { KinesisstreamsToGluejob } from '../lib';

// Setup
const app = new App();
Expand Down Expand Up @@ -61,7 +61,7 @@ const job = new CfnJob(stack, 'ExistingJob', {
});

// Definitions
new KinesisStreamGlueJob(stack, 'test-kinesisstreams-lambda', {
new KinesisstreamsToGluejob(stack, 'test-kinesisstreams-lambda', {
existingGlueJob: job,
fieldSchema: [{
name: "id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import { CfnTable } from '@aws-cdk/aws-glue';
import { Asset } from '@aws-cdk/aws-s3-assets';
import { App, Stack } from '@aws-cdk/core';
import { KinesisStreamGlueJob } from '../lib';
import { KinesisstreamsToGluejob } from '../lib';

// Setup
const app = new App();
Expand All @@ -40,7 +40,7 @@ const fieldSchema: CfnTable.ColumnProperty [] = [{
comment: "Some value associated with the record"
}];

new KinesisStreamGlueJob(stack, 'test-kinesisstreams-lambda', {
new KinesisstreamsToGluejob(stack, 'test-kinesisstreams-lambda', {
glueJobProps: {
command: {
name: 'gluestreaming',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import { Stream, StreamEncryption } from '@aws-cdk/aws-kinesis';
import { Duration, Stack } from "@aws-cdk/core";
import * as defaults from '@aws-solutions-constructs/core';
import { SinkStoreType } from '@aws-solutions-constructs/core';
import { KinesisStreamGlueJob, KinesisStreamGlueJobProps } from '../lib';
import { KinesisstreamsToGluejob, KinesisstreamsToGluejobProps } from '../lib';

// --------------------------------------------------------------
// Pattern minimal deployment
// --------------------------------------------------------------
test('Pattern minimal deployment', () => {
// Initial setup
const stack = new Stack();
const props: KinesisStreamGlueJobProps = {
const props: KinesisstreamsToGluejobProps = {
glueJobProps: {
command: {
name: 'glueetl',
Expand All @@ -52,7 +52,7 @@ test('Pattern minimal deployment', () => {
comment: "Some value associated with the record"
}],
};
new KinesisStreamGlueJob(stack, 'test-kinesisstreams-lambda', props);
new KinesisstreamsToGluejob(stack, 'test-kinesisstreams-lambda', props);
// Assertion 1
expect(SynthUtils.toCloudFormation(stack)).toMatchSnapshot();

Expand Down Expand Up @@ -271,7 +271,7 @@ test('Test if existing Glue Job is provided', () => {
securityConfiguration: 'testSecConfig'
});

new KinesisStreamGlueJob(stack, 'test-kinesisstreams-lambda', {
new KinesisstreamsToGluejob(stack, 'test-kinesisstreams-lambda', {
existingGlueJob: existingCfnJob,
fieldSchema: [{
name: "id",
Expand Down Expand Up @@ -315,7 +315,7 @@ test('When S3 bucket location for script exists', () => {
// Initial setup
const stack = new Stack();
const _s3ObjectUrl: string = 's3://fakelocation/etl/fakefile.py';
const props: KinesisStreamGlueJobProps = {
const props: KinesisstreamsToGluejobProps = {
glueJobProps: {
command: {
name: 'pythonshell',
Expand Down Expand Up @@ -344,7 +344,7 @@ test('When S3 bucket location for script exists', () => {
datastoreType: SinkStoreType.S3
}
};
new KinesisStreamGlueJob(stack, 'test-kinesisstreams-lambda', props);
new KinesisstreamsToGluejob(stack, 'test-kinesisstreams-lambda', props);
// Assertion 1
expect(SynthUtils.toCloudFormation(stack)).toMatchSnapshot();
expect(stack).toHaveResourceLike('AWS::Glue::Job', {
Expand All @@ -371,7 +371,7 @@ test('create glue job with existing kinesis stream', () => {
retentionPeriod: Duration.hours(30)
});

new KinesisStreamGlueJob(stack, 'existingStreamJob', {
new KinesisstreamsToGluejob(stack, 'existingStreamJob', {
glueJobProps: {
command: {
name: 'pythonshell',
Expand Down Expand Up @@ -420,7 +420,7 @@ test('Do not pass s3ObjectUrlForScript or scriptLocationPath, error out', () =>
const stack = new Stack();
try {
const _kinesisStream = defaults.buildKinesisStream(stack, {});
new KinesisStreamGlueJob(stack, 'existingStreamJob', {
new KinesisstreamsToGluejob(stack, 'existingStreamJob', {
glueJobProps: {
command: {
name: 'pythonshell',
Expand Down Expand Up @@ -461,7 +461,7 @@ test('Do not pass fieldSchame or table (CfnTable), error out', () => {
const stack = new Stack();

try {
const props: KinesisStreamGlueJobProps = {
const props: KinesisstreamsToGluejobProps = {
glueJobProps: {
command: {
name: 'glueetl',
Expand All @@ -473,7 +473,7 @@ test('Do not pass fieldSchame or table (CfnTable), error out', () => {
datastoreType: SinkStoreType.S3
}
};
new KinesisStreamGlueJob(stack, 'test-kinesisstreams-lambda', props);
new KinesisstreamsToGluejob(stack, 'test-kinesisstreams-lambda', props);
} catch (error) {
expect(error).toBeInstanceOf(Error);
}
Expand All @@ -491,7 +491,7 @@ test('When database and table are provided', () => {
description: 'a fake glue db'
}
});
const props: KinesisStreamGlueJobProps = {
const props: KinesisstreamsToGluejobProps = {
glueJobProps: {
command: {
name: 'glueetl',
Expand All @@ -518,7 +518,7 @@ test('When database and table are provided', () => {
comment: "Some value associated with the record"
}], 'kinesis', { STREAM_NAME: 'testStream' })
};
new KinesisStreamGlueJob(stack, 'test-kinesisstreams-lambda', props);
new KinesisstreamsToGluejob(stack, 'test-kinesisstreams-lambda', props);
// Assertion 1
expect(SynthUtils.toCloudFormation(stack)).toMatchSnapshot();
expect(stack).toHaveResourceLike('AWS::Glue::Database', {
Expand All @@ -538,7 +538,7 @@ test('When database and table are provided', () => {
test('When database and table are not provided', () => {
// Initial setup
const stack = new Stack();
const props: KinesisStreamGlueJobProps = {
const props: KinesisstreamsToGluejobProps = {
glueJobProps: {
command: {
name: 'glueetl',
Expand All @@ -564,7 +564,7 @@ test('When database and table are not provided', () => {
comment: "Some value associated with the record"
}]
};
new KinesisStreamGlueJob(stack, 'test-kinesisstreams-lambda', props);
new KinesisstreamsToGluejob(stack, 'test-kinesisstreams-lambda', props);
// Assertion 1
expect(SynthUtils.toCloudFormation(stack)).toMatchSnapshot();
expect(stack).toHaveResourceLike('AWS::Glue::Database', {
Expand Down
Loading

0 comments on commit cfd9836

Please sign in to comment.