Skip to content

Commit

Permalink
refactor(@whook/aws-lambda): refactor lambda triggers
Browse files Browse the repository at this point in the history
Refactoring the wrappers to simplify them except for tricky parts like encoding/decoding of events.

BREAKING CHANGE: The consumers do not manage batchs anymore and simply pass the records to the
lambda implementation in order to avoid having to write custom wrappers for each event type. The
user are now free to handle the batchs the way they want.

fix #95 concerns #96
  • Loading branch information
nfroidure committed Dec 22, 2020
1 parent b06da51 commit f948220
Show file tree
Hide file tree
Showing 17 changed files with 14,441 additions and 4,655 deletions.
17,865 changes: 13,345 additions & 4,520 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/whook-aws-lambda/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"webpack": "^4.44.1"
},
"dependencies": {
"@types/aws-lambda": "^8.10.66",
"@whook/cli": "^6.0.0",
"@whook/http-router": "^6.0.0",
"@whook/http-transaction": "^6.0.0",
Expand All @@ -65,6 +66,7 @@
"openapi-types": "^7.0.1",
"qs": "^6.9.4",
"strict-qs": "^6.1.3",
"type-fest": "^0.20.2",
"uuid": "^8.3.1",
"yerror": "^5.0.0",
"yhttperror": "^5.0.0"
Expand Down
6 changes: 3 additions & 3 deletions packages/whook-aws-lambda/src/commands/testConsumerLambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const definition: WhookCommandDefinition = {
default: 'index',
},
event: {
description: 'The consumer event',
description: 'The event batch',
type: 'string',
},
},
Expand Down Expand Up @@ -60,12 +60,12 @@ async function initTestConsumerLambdaCommand({
const handlerPromise = handler(
parsedEvent,
{
succeed: (...args) => {
succeed: (...args: unknown[]) => {
handlerPromise.then(resolve.bind(null, ...args));
},
fail: reject,
},
(err, ...args) => {
(err: Error, ...args: unknown[]) => {
if (err) {
reject(err);
return;
Expand Down
4 changes: 2 additions & 2 deletions packages/whook-aws-lambda/src/commands/testCronLambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ async function initTestCronLambdaCommand({
time: date,
},
{
succeed: (...args) => {
succeed: (...args: unknown[]) => {
handlerPromise.then(resolve.bind(null, ...args));
},
fail: reject,
},
(err, ...args) => {
(err: Error, ...args: unknown[]) => {
if (err) {
reject(err);
return;
Expand Down
39 changes: 21 additions & 18 deletions packages/whook-aws-lambda/src/commands/testHTTPLambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
} from '@whook/cli';
import type { OpenAPIV3 } from 'openapi-types';
import type { WhookAPIOperationAWSLambdaConfig } from '..';
import type { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';

export const definition: WhookCommandDefinition = {
description: 'A command for testing AWS HTTP lambda',
Expand Down Expand Up @@ -103,7 +104,7 @@ async function initTestHTTPLambdaCommand({
).concat(OPERATION.parameters);
const hasBody = !!OPERATION.requestBody;
const parameters = JSON.parse(rawParameters);
const awsRequest = {
const awsRequest: APIGatewayProxyEvent = {
pathParameters: ammendedParameters
.filter((p) => p.in === 'path')
.reduce((pathParameters, p) => {
Expand Down Expand Up @@ -144,30 +145,32 @@ async function initTestHTTPLambdaCommand({
requestId: randomUUID(),
httpMethod: OPERATION.method.toUpperCase(),
},
};
} as APIGatewayProxyEvent;
if (hasBody) {
awsRequest.headers['content-type'] = `${contentType};charset=UTF-8`;
}
log('info', 'AWS_REQUEST:', awsRequest);

const result = await new Promise((resolve, reject) => {
const handlerPromise = handler(
awsRequest,
{
succeed: (...args) => {
const result: APIGatewayProxyResult = await new Promise(
(resolve, reject) => {
const handlerPromise = handler(
awsRequest,
{
succeed: (...args: unknown[]) => {
handlerPromise.then(resolve.bind(null, ...args));
},
fail: reject,
},
(err: Error, ...args: unknown[]) => {
if (err) {
reject(err);
return;
}
handlerPromise.then(resolve.bind(null, ...args));
},
fail: reject,
},
(err, ...args) => {
if (err) {
reject(err);
return;
}
handlerPromise.then(resolve.bind(null, ...args));
},
).catch(reject);
});
).catch(reject);
},
);
log('info', 'SUCCESS:', result);
};
}
115 changes: 115 additions & 0 deletions packages/whook-aws-lambda/src/commands/testKafkaConsumerLambda.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { loadLambda } from '../libs/utils';
import { extra, autoService } from 'knifecycle';
import { readArgs } from '@whook/cli';
import type { WhookCommandArgs, WhookCommandDefinition } from '@whook/cli';
import type { LogService } from 'common-services';
import type { MSKEvent } from 'aws-lambda';

const DEFAULT_EVENT: MSKEvent = {
eventSource: 'aws:kafka',
eventSourceArn:
'arn:aws:kafka:eu-west-3:765225263528:cluster/production/abbacaca-abba-caca-abba-cacaabbacaca-2',
records: {
'ingestion-bench-1': [
{
key: 'none',
topic: 'tropic',
partition: 1,
offset: 0,
timestamp: 1608321344592,
timestampType: 'CREATE_TIME',
value:
'WyJERy1TUi0wMDAxIiwieCIsIjIwMjAtMTAtMTVUMDg6MjE6MTAuMzA4WiIsIi0yNzIuMCJd',
},
{
key: 'none',
topic: 'tropic',
partition: 1,
offset: 1,
timestamp: 1608321344801,
timestampType: 'CREATE_TIME',
value:
'WyJERy1TUi0wMDAxIiwieCIsIjIwMjAtMTAtMTVUMDg6MjE6MTEuMjE1WiIsIi0xOTIuMCJd',
},
],
},
};

export const definition: WhookCommandDefinition = {
description: 'A command for testing AWS lambda Kafka consumers',
example: `whook KafkaConsumer --name handleKafkaConsumerLambda`,
arguments: {
type: 'object',
additionalProperties: false,
required: ['name'],
properties: {
name: {
description: 'Name of the lamda to run',
type: 'string',
},
type: {
description: 'Type of lambda to test',
type: 'string',
enum: ['main', 'index'],
default: 'index',
},
event: {
description: 'The Kafka batch event',
type: 'string',
default: JSON.stringify(DEFAULT_EVENT),
},
},
},
};

export default extra(
definition,
autoService(initTestKafkaConsumerLambdaCommand),
);

async function initTestKafkaConsumerLambdaCommand({
NODE_ENV,
PROJECT_DIR,
log,
args,
}: {
NODE_ENV: string;
PROJECT_DIR: string;
log: LogService;
args: WhookCommandArgs;
}) {
return async () => {
const { name, type, event } = readArgs(definition.arguments, args) as {
name: string;
type: string;
event: string;
};
const handler = await loadLambda(
{ PROJECT_DIR, log },
NODE_ENV,
name,
type,
);
const parsedEvent = JSON.parse(event);
const result = await new Promise((resolve, reject) => {
const handlerPromise = handler(
parsedEvent,
{
succeed: (...args: unknown[]) => {
handlerPromise.then(resolve.bind(null, ...args));
},
fail: reject,
},
(err: Error, ...args: unknown[]) => {
if (err) {
reject(err);
return;
}
handlerPromise.then(resolve.bind(null, ...args));
},
).catch(reject);
});

log('info', 'SUCCESS:', result);
};
}
108 changes: 108 additions & 0 deletions packages/whook-aws-lambda/src/commands/testLogSubscriberLambda.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { loadLambda } from '../libs/utils';
import { extra, autoService } from 'knifecycle';
import { readArgs } from '@whook/cli';
import { encodePayload } from '../wrappers/awsLogSubscriberLambda';
import type { WhookCommandArgs, WhookCommandDefinition } from '@whook/cli';
import type { LogService } from 'common-services';
import type {
CloudWatchLogsDecodedData,
CloudWatchLogsEvent,
} from 'aws-lambda';

// Event example from:
// https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html
const DEFAULT_EVENT: CloudWatchLogsDecodedData = {
messageType: 'DATA_MESSAGE',
owner: '123456789012',
logGroup: '/aws/lambda/echo-nodejs',
logStream: '2019/03/13/[$LATEST]94fa867e5374431291a7fc14e2f56ae7',
subscriptionFilters: ['LambdaStream_cloudwatchlogs-node'],
logEvents: [
{
id: '34622316099697884706540976068822859012661220141643892546',
timestamp: 1552518348220,
message:
'REPORT RequestId: 6234bffe-149a-b642-81ff-2e8e376d8aff\tDuration: 46.84 ms\tBilled Duration: 47 ms \tMemory Size: 192 MB\tMax Memory Used: 72 MB\t\n',
},
],
};

export const definition: WhookCommandDefinition = {
description: 'A command for testing AWS consumer lambda',
example: `whook testS3Lambda --name handleS3Lambda`,
arguments: {
type: 'object',
additionalProperties: false,
required: ['name'],
properties: {
name: {
description: 'Name of the lamda to run',
type: 'string',
},
type: {
description: 'Type of lambda to test',
type: 'string',
enum: ['main', 'index'],
default: 'index',
},
event: {
description: 'The S3 actions batch event',
type: 'string',
default: JSON.stringify(DEFAULT_EVENT),
},
},
},
};

export default extra(definition, autoService(initTestS3LambdaCommand));

async function initTestS3LambdaCommand({
NODE_ENV,
PROJECT_DIR,
log,
args,
}: {
NODE_ENV: string;
PROJECT_DIR: string;
log: LogService;
args: WhookCommandArgs;
}) {
return async () => {
const { name, type, event } = readArgs(definition.arguments, args) as {
name: string;
type: string;
event: string;
};
const handler = await loadLambda(
{ PROJECT_DIR, log },
NODE_ENV,
name,
type,
);
const parsedEvent: CloudWatchLogsEvent = {
awslogs: {
data: await encodePayload(JSON.parse(event)),
},
};
const result = await new Promise((resolve, reject) => {
const handlerPromise = handler(
parsedEvent,
{
succeed: (...args: unknown[]) => {
handlerPromise.then(resolve.bind(null, ...args));
},
fail: reject,
},
(err: Error, ...args: unknown[]) => {
if (err) {
reject(err);
return;
}
handlerPromise.then(resolve.bind(null, ...args));
},
).catch(reject);
});

log('info', 'SUCCESS:', result);
};
}
Loading

0 comments on commit f948220

Please sign in to comment.