Skip to content

Commit

Permalink
refactor(@whook/aws-lambda): refactor lambda triggers
Browse files Browse the repository at this point in the history
Refactoring the wrappers to simplify them except for tricky parts like encoding/decoding of events.

BREAKING CHANGE: The consumers do not manage batchs anymore and simply pass the records to the
lambda implementation in order to avoid having to write custom wrappers for each event type. The
user are now free to handle the batchs the way they want.

fix #95 concerns #96
  • Loading branch information
nfroidure committed Dec 19, 2020
1 parent 78298d9 commit 863f9bc
Show file tree
Hide file tree
Showing 11 changed files with 13,988 additions and 4,645 deletions.
17,865 changes: 13,345 additions & 4,520 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/whook-aws-lambda/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"webpack": "^4.44.1"
},
"dependencies": {
"@types/aws-lambda": "^8.10.66",
"@whook/cli": "^6.0.0",
"@whook/http-router": "^6.0.0",
"@whook/http-transaction": "^6.0.0",
Expand All @@ -65,6 +66,7 @@
"openapi-types": "^7.0.1",
"qs": "^6.9.4",
"strict-qs": "^6.1.3",
"type-fest": "^0.20.2",
"uuid": "^8.3.1",
"yerror": "^5.0.0",
"yhttperror": "^5.0.0"
Expand Down
39 changes: 21 additions & 18 deletions packages/whook-aws-lambda/src/commands/testHTTPLambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
} from '@whook/cli';
import type { OpenAPIV3 } from 'openapi-types';
import type { WhookAPIOperationAWSLambdaConfig } from '..';
import type { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';

export const definition: WhookCommandDefinition = {
description: 'A command for testing AWS HTTP lambda',
Expand Down Expand Up @@ -103,7 +104,7 @@ async function initTestHTTPLambdaCommand({
).concat(OPERATION.parameters);
const hasBody = !!OPERATION.requestBody;
const parameters = JSON.parse(rawParameters);
const awsRequest = {
const awsRequest: APIGatewayProxyEvent = {
pathParameters: ammendedParameters
.filter((p) => p.in === 'path')
.reduce((pathParameters, p) => {
Expand Down Expand Up @@ -144,30 +145,32 @@ async function initTestHTTPLambdaCommand({
requestId: randomUUID(),
httpMethod: OPERATION.method.toUpperCase(),
},
};
} as APIGatewayProxyEvent;
if (hasBody) {
awsRequest.headers['content-type'] = `${contentType};charset=UTF-8`;
}
log('info', 'AWS_REQUEST:', awsRequest);

const result = await new Promise((resolve, reject) => {
const handlerPromise = handler(
awsRequest,
{
succeed: (...args) => {
const result: APIGatewayProxyResult = await new Promise(
(resolve, reject) => {
const handlerPromise = handler(
awsRequest,
{
succeed: (...args) => {
handlerPromise.then(resolve.bind(null, ...args));
},
fail: reject,
},
(err, ...args) => {
if (err) {
reject(err);
return;
}
handlerPromise.then(resolve.bind(null, ...args));
},
fail: reject,
},
(err, ...args) => {
if (err) {
reject(err);
return;
}
handlerPromise.then(resolve.bind(null, ...args));
},
).catch(reject);
});
).catch(reject);
},
);
log('info', 'SUCCESS:', result);
};
}
71 changes: 60 additions & 11 deletions packages/whook-aws-lambda/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,43 @@ import type {
WhookCompilerService,
WhookCompilerConfig,
} from './services/compiler';
import type { Autoloader } from 'knifecycle';
import type { Autoloader, Dependencies } from 'knifecycle';
import type { WhookOperation } from '@whook/whook';
import type { OpenAPIV3 } from 'openapi-types';
import type { LogService } from 'common-services';
import type { CprOptions } from 'cpr';

export type {
LambdaConsumerInput,
LambdaConsumerOutput,
} from './wrappers/awsConsumerLambda';
export type {
LambdaCronInput,
LambdaCronOutput,
} from './wrappers/awsCronLambda';
export type {
LambdaHTTPInput,
LambdaHTTPOutput,
} from './wrappers/awsHTTPLambda';
export type {
LambdaKafkaConsumerInput,
LambdaKafkaConsumerOutput,
} from './wrappers/awsKafkaConsumerLambda';
export type {
LambdaLogSubscriberInput,
LambdaLogSubscriberOutput,
} from './wrappers/awsLogSubscriberLambda';
export type { LambdaS3Input, LambdaS3Output } from './wrappers/awsS3Lambda';
export type {
LambdaTransformerInput,
LambdaTransformerOutput,
} from './wrappers/awsTransformerLambda';
export type { WhookCompilerConfig, WhookCompilerOptions, WhookCompilerService };

export { DEFAULT_COMPILER_OPTIONS };

export type WhookAPIOperationAWSLambdaConfig = {
type?: 'http' | 'cron' | 'consumer' | 'transformer';
type?: 'http' | 'cron' | 'consumer' | 'transformer' | 'kafka' | 's3' | 'log';
sourceOperationId?: string;
staticFiles?: string[];
compilerOptions?: WhookCompilerOptions;
Expand All @@ -50,16 +77,17 @@ const writeFileAsync = util.promisify(fs.writeFile) as (
const cprAsync = util.promisify(cpr) as (
source: string,
destination: string,
options: any,
) => Promise<any>;
options: CprOptions,
) => Promise<string[]>;

const BUILD_DEFINITIONS: {
[type: string]: {
const BUILD_DEFINITIONS: Record<
WhookAPIOperationAWSLambdaConfig['type'],
{
type: string;
wrapper: { name: string; path: string };
suffix?: string;
};
} = {
}
> = {
http: {
type: 'HTTP',
wrapper: {
Expand Down Expand Up @@ -89,11 +117,32 @@ const BUILD_DEFINITIONS: {
path: path.join(__dirname, 'wrappers', 'awsConsumerLambda'),
},
},
kafka: {
type: 'Kafka',
wrapper: {
name: 'wrapHandlerForAWSKafkaConsumerLambda',
path: path.join(__dirname, 'wrappers', 'awsKafkaConsumerLambda'),
},
},
s3: {
type: 'S3',
wrapper: {
name: 'wrapHandlerForAWSS3Lambda',
path: path.join(__dirname, 'wrappers', 'awsS3Lambda'),
},
},
log: {
type: 'Log',
wrapper: {
name: 'wrapHandlerForAWSLogSubscriberLambda',
path: path.join(__dirname, 'wrappers', 'awsLogSubscriberLambda'),
},
},
};

export async function prepareBuildEnvironment<T extends Knifecycle<any>>(
$: T = new Knifecycle() as T,
): Promise<T> {
export async function prepareBuildEnvironment<
T extends Knifecycle<Dependencies>
>($: T = new Knifecycle() as T): Promise<T> {
$.register(
constant('INITIALIZER_PATH_MAP', {
ENV: require.resolve('@whook/whook/dist/services/ProxyedENV'),
Expand Down
108 changes: 63 additions & 45 deletions packages/whook-aws-lambda/src/wrappers/awsConsumerLambda.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,40 @@
import { reuseSpecialProps, alsoInject } from 'knifecycle';
import { noop, identity } from '@whook/whook';
import { noop } from '@whook/whook';
import YError from 'yerror';
import YHTTPError from 'yhttperror';
import type { ServiceInitializer } from 'knifecycle';
import type { WhookOperation, APMService, WhookHandler } from '@whook/whook';
import type {
WhookOperation,
APMService,
WhookHandler,
WhookResponse,
} from '@whook/whook';
import type { TimeService, LogService } from 'common-services';
import type { OpenAPIV3 } from 'openapi-types';
import type {
KinesisStreamEvent,
SQSEvent,
SNSEvent,
Context,
SESEvent,
DynamoDBStreamEvent,
} from 'aws-lambda';

export type LambdaKinesisStreamConsumerInput = {
body: KinesisStreamEvent['Records'];
};
export type LambdaSQSConsumerInput = { body: SQSEvent['Records'] };
export type LambdaSNSConsumerInput = { body: SNSEvent['Records'] };
export type LambdaSESConsumerInput = { body: SESEvent['Records'] };
export type LambdaDynamoDBStreamConsumerInput = {
body: DynamoDBStreamEvent['Records'];
};
export type LambdaConsumerInput =
| LambdaKinesisStreamConsumerInput
| LambdaSQSConsumerInput
| LambdaSNSConsumerInput
| LambdaSESConsumerInput
| LambdaDynamoDBStreamConsumerInput;
export type LambdaConsumerOutput = void;

type ConsumerWrapperDependencies = {
NODE_ENV: string;
Expand All @@ -21,13 +50,16 @@ export default function wrapHandlerForAWSConsumerLambda<
>(
initHandler: ServiceInitializer<D, S>,
): ServiceInitializer<D & ConsumerWrapperDependencies, S> {
return alsoInject(
return alsoInject<ConsumerWrapperDependencies, D, S>(
['OPERATION_API', 'NODE_ENV', 'apm', '?time', '?log'],
reuseSpecialProps(
initHandler,
initHandlerForAWSConsumerLambda.bind(null, initHandler),
initHandlerForAWSConsumerLambda.bind(
null,
initHandler,
) as ServiceInitializer<D, S>,
),
) as any;
);
}

async function initHandlerForAWSConsumerLambda<D, S extends WhookHandler>(
Expand All @@ -47,10 +79,22 @@ async function handleForAWSConsumerLambda(
time = Date.now.bind(Date),
log = noop,
}: ConsumerWrapperDependencies,
handler: WhookHandler,
event: { Records: unknown[] },
context: unknown,
callback: (err: Error, result?: any) => void,
handler: WhookHandler<
LambdaConsumerInput,
WhookResponse<
number,
Record<string, string | string[]>,
LambdaConsumerOutput
>
>,
event:
| KinesisStreamEvent
| SQSEvent
| SNSEvent
| SESEvent
| DynamoDBStreamEvent,
context: Context,
callback: (err: Error) => void,
) {
const path = Object.keys(OPERATION_API.paths)[0];
const method = Object.keys(OPERATION_API.paths[path])[0];
Expand All @@ -60,53 +104,26 @@ async function handleForAWSConsumerLambda(
...OPERATION_API.paths[path][method],
};
const startTime = time();
const parameters = {
body: event.Records,
} as LambdaConsumerInput;

try {
log('info', 'EVENT', JSON.stringify(event));

const responses = await Promise.all(
event.Records.map(async (body, index) => {
try {
return await handler({ body }, OPERATION);
} catch (err) {
const castedError = YHTTPError.cast(err);

log('debug', `💥 - Could not process the record at index ${index}!`);
log('debug-stack', err.stack);

return {
status: castedError.httpCode,
body: {
code: castedError.code,
stack: 'test' === NODE_ENV ? castedError.stack : undefined,
params: castedError.params,
},
};
}
}),
);
const batchStats: any = {};
const failures = responses.filter((response) => response.status >= 500);
log('debug', 'EVENT', JSON.stringify(event));

batchStats.batchItems = responses.length;
batchStats.batchSuccesses = responses.length - failures.length;
batchStats.batchFailures = failures.length;
batchStats.batchStatuses = responses.map((response) => response.status);
batchStats.batchErrorCodes = responses
.map((response) => response.body && (response.body as any).code)
.filter(identity);
await handler(parameters, OPERATION);

apm('CONSUMER', {
environment: NODE_ENV,
triggerTime: startTime,
lambdaName: OPERATION.operationId,
type: batchStats.batchFailures ? 'fail' : 'success',
type: 'success',
startTime,
endTime: time(),
...batchStats,
recordsLength: event.Records.length,
});

callback(null, { status: 200 });
callback(null);
} catch (err) {
const castedErr = YError.cast(err);

Expand All @@ -120,6 +137,7 @@ async function handleForAWSConsumerLambda(
params: castedErr.params,
startTime,
endTime: time(),
recordsLength: event.Records.length,
});

callback(err);
Expand Down
Loading

0 comments on commit 863f9bc

Please sign in to comment.