Skip to content

Commit

Permalink
feat(batch): add batch processing utility (#1625)
Browse files Browse the repository at this point in the history
* chore: init workspace

* chore: init workspace

* feat(batch): Implementation of base batch processing classes (#1588)

* chore: init workspace

* chore: init workspace

* Initial base class implementation

* Added BatchProcessor implementation, attempted fix for async

* Added unit tests

* Refactoring unit tests

* Lint fix, updated docstrings

* Added response and identifier typings

* test(idempotency): improve integration tests for utility (#1591)

* docs: new name

* chore: rename e2e files

* tests(idempotency): expand integration tests

* chore(idempotency): remove unreachable code

* Removed unnecessary type casting

* Moved exports for handlers and factories

* Updated imports, refactored randomization in factories

* Refactored EventType to be const instead of enum

* Refactored and added documentation for errors

* Removed debugging line

* chore(ci): add canary to layer deployment (#1593)

* docs(idempotency): write utility docs (#1592)

* docs: base docs

* wip

* chore: added paths to snippets tsconfig

* chore: added page to docs menu

* docs(idempotency): utility docs

* highlights

* chore: remove CDK mention

* build(internal): bump semver from 5.7.1 to 5.7.2 (#1594)

Bumps [semver](https://github.com/npm/node-semver) from 5.7.1 to 5.7.2.
- [Release notes](https://github.com/npm/node-semver/releases)
- [Changelog](https://github.com/npm/node-semver/blob/v5.7.2/CHANGELOG.md)
- [Commits](npm/node-semver@v5.7.1...v5.7.2)

---
updated-dependencies:
- dependency-name: semver
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(idempotency): mark the utility ready public beta (#1595)

* chore(idempotency): mark utility as public beta

* chore: manually increment version in commons

* docs(internal): update AWS SDK links to new docs (#1597)

* chore(maintenance): remove parameters utility from layer bundling and layers e2e tests (#1599)

* remove parameter from e2e tests

* remove parameters from canary stack as well

* chore(release): v1.11.1 [skip ci]

* fix canary deploy in ci with correct workspace name (#1601)

* chore: update layer ARN on documentation

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Andrea Amorosi <dreamorosi@gmail.com>
Co-authored-by: Alexander Schueren <amelnyk@amazon.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Release bot[bot] <aws-devax-open-source@amazon.com>

* feat(batch): Batch processing wrapper function (#1605)

* Refactored some types, added function wrapper and base test

* Added record check and tests, renamed factories

* Refactored type check logic in function

* Refactor test to remove error ignore

* feat(batch): Implement SQS FIFO processor class (#1606)

* Added SQS FIFO processor and unit tests

* Added docstring for pbatch processing function

* feat(batch): Support for Lambda context access in batch processing (#1609)

* Added types and parameter for lambda context, added unit tests

* Refactor parameter checking

* Added test for malformed context handling

* docs: created utility docs

* docs: fixed white spaces

* feat(batch): add async processor (#1616)

* feat(batch): add async processor

* tests: improved unit tests

* chore: removed docstring + edited test handler

* chore: fix typos

* docs: added README

* chore: added package to beta release

* chore: marked package as public

* chore: added new batch page to docs

* chore: added utility to lerna workspace

* chore: added utility to main readme

* chore: added utility to CI

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Erika Yao <71943596+erikayao93@users.noreply.github.com>
Co-authored-by: Alexander Schueren <amelnyk@amazon.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Release bot[bot] <aws-devax-open-source@amazon.com>
  • Loading branch information
6 people authored Jul 25, 2023
1 parent 0b575a1 commit c4e6b19
Show file tree
Hide file tree
Showing 55 changed files with 3,367 additions and 23 deletions.
5 changes: 4 additions & 1 deletion .github/scripts/release_patch_package_json.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ if (process.argv.length < 3) {
const basePath = resolve(process.argv[2]);
const packageJsonPath = join(basePath, 'package.json');
const alphaPackages = [];
const betaPackages = ['@aws-lambda-powertools/idempotency'];
const betaPackages = [
'@aws-lambda-powertools/idempotency',
'@aws-lambda-powertools/batch',
];

(() => {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ jobs:
with:
nodeVersion: ${{ matrix.version }}
- name: Run linting
run: npm run lint -w packages/commons -w packages/logger -w packages/tracer -w packages/metrics -w packages/parameters -w packages/idempotency
run: npm run lint -w packages/commons -w packages/logger -w packages/tracer -w packages/metrics -w packages/parameters -w packages/idempotency -w packages/batch
- name: Run unit tests
run: npm t -w packages/commons -w packages/logger -w packages/tracer -w packages/metrics -w packages/parameters -w packages/idempotency
run: npm t -w packages/commons -w packages/logger -w packages/tracer -w packages/metrics -w packages/parameters -w packages/idempotency -w packages/batch
check-examples:
runs-on: ubuntu-latest
env:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ You can use the library in both TypeScript and JavaScript code bases.
* **[Metrics](https://docs.powertools.aws.dev/lambda-typescript/latest/core/metrics/)** - Custom Metrics created asynchronously via CloudWatch Embedded Metric Format (EMF)
* **[Parameters](https://docs.powertools.aws.dev/lambda-typescript/latest/utilities/parameters/)** - High-level functions to retrieve one or more parameters from AWS SSM Parameter Store, AWS Secrets Manager, AWS AppConfig, and Amazon DynamoDB
* **[Idempotency (beta)](https://docs.powertools.aws.dev/lambda-typescript/latest/utilities/idempotency/)** - Class method decorator, Middy middleware, and function wrapper to make your Lambda functions idempotent and prevent duplicate execution based on payload content
* **[Batch Processing (beta)](https://docs.powertools.aws.dev/lambda-typescript/latest/utilities/batch/)** - Utility to handle partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.

## Getting started

Expand Down
37 changes: 37 additions & 0 deletions docs/snippets/batch/accessLambdaContext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
SQSEvent,
SQSRecord,
Context,
SQSBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord, lambdaContext?: Context): void => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
logger.info('Processed item', { item });
}
if (lambdaContext) {
logger.info('Remaining time', {
time: lambdaContext.getRemainingTimeInMillis(),
});
}
};

export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
38 changes: 38 additions & 0 deletions docs/snippets/batch/accessProcessedMessages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { BatchProcessor, EventType } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
SQSEvent,
SQSRecord,
Context,
SQSBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
logger.info('Processed item', { item });
}
};

export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
const batch = event.Records;

processor.register(batch, recordHandler, { context });
const processedMessages = processor.process();

for (const message of processedMessages) {
const status: 'success' | 'fail' = message[0];
const record = message[2];

logger.info('Processed record', { status, record });
}

return processor.response();
};
98 changes: 98 additions & 0 deletions docs/snippets/batch/customPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { randomInt } from 'node:crypto';
import {
DynamoDBClient,
BatchWriteItemCommand,
} from '@aws-sdk/client-dynamodb';
import { marshall } from '@aws-sdk/util-dynamodb';
import {
BasePartialProcessor,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import type {
SuccessResponse,
FailureResponse,
EventSourceType,
} from '@aws-lambda-powertools/batch';
import type { SQSEvent, Context, SQSBatchResponse } from 'aws-lambda';

const tableName = process.env.TABLE_NAME || 'table-not-found';

class MyPartialProcessor extends BasePartialProcessor {
#tableName: string;
#client?: DynamoDBClient;

public constructor(tableName: string) {
super();
this.#tableName = tableName;
}

public async asyncProcessRecord(
_record: EventSourceType
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented');
}

/**
* It's called once, **after** processing the batch.
*
* Here we are writing all the processed messages to DynamoDB.
*/
public clean(): void {
// We know that the client is defined because clean() is called after prepare()
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.#client!.send(
new BatchWriteItemCommand({
RequestItems: {
[this.#tableName]: this.successMessages.map((message) => ({
PutRequest: {
Item: marshall(message),
},
})),
},
})
);
}

/**
* It's called once, **before** processing the batch.
*
* It initializes a new client and cleans up any existing data.
*/
public prepare(): void {
this.#client = new DynamoDBClient({});
this.successMessages = [];
}

/**
* It handles how your record is processed.
*
* Here we are keeping the status of each run, `this.handler` is
* the function that is passed when calling `processor.register()`.
*/
public processRecord(
record: EventSourceType
): SuccessResponse | FailureResponse {
try {
const result = this.handler(record);

return this.successHandler(record, result);
} catch (error) {
return this.failureHandler(record, error as Error);
}
}
}

const processor = new MyPartialProcessor(tableName);

const recordHandler = (): number => {
return Math.floor(randomInt(1, 10));
};

export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
53 changes: 53 additions & 0 deletions docs/snippets/batch/extendingFailure.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { Metrics, MetricUnits } from '@aws-lambda-powertools/metrics';
import {
BatchProcessor,
EventType,
FailureResponse,
EventSourceType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
SQSEvent,
SQSRecord,
Context,
SQSBatchResponse,
} from 'aws-lambda';

class MyProcessor extends BatchProcessor {
#metrics: Metrics;

public constructor(eventType: keyof typeof EventType) {
super(eventType);
this.#metrics = new Metrics({ namespace: 'test' });
}

public failureHandler(
record: EventSourceType,
error: Error
): FailureResponse {
this.#metrics.addMetric('BatchRecordFailures', MetricUnits.Count, 1);

return super.failureHandler(record, error);
}
}

const processor = new MyProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
logger.info('Processed item', { item });
}
};

export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
31 changes: 31 additions & 0 deletions docs/snippets/batch/gettingStartedAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {
AsyncBatchProcessor,
EventType,
asyncProcessPartialResponse,
} from '@aws-lambda-powertools/batch';
import axios from 'axios'; // axios is an external dependency
import type {
SQSEvent,
SQSRecord,
Context,
SQSBatchResponse,
} from 'aws-lambda';

const processor = new AsyncBatchProcessor(EventType.SQS);

const recordHandler = async (record: SQSRecord): Promise<number> => {
const res = await axios.post('https://httpbin.org/anything', {
message: record.body,
});

return res.status;
};

export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return await asyncProcessPartialResponse(event, recordHandler, processor, {
context,
});
};
35 changes: 35 additions & 0 deletions docs/snippets/batch/gettingStartedDynamoDBStreams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
DynamoDBStreamEvent,
DynamoDBRecord,
Context,
DynamoDBBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = (record: DynamoDBRecord): void => {
if (record.dynamodb && record.dynamodb.NewImage) {
logger.info('Processing record', { record: record.dynamodb.NewImage });
const message = record.dynamodb.NewImage.Message.S;
if (message) {
const payload = JSON.parse(message);
logger.info('Processed item', { item: payload });
}
}
};

export const handler = async (
event: DynamoDBStreamEvent,
context: Context
): Promise<DynamoDBBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
30 changes: 30 additions & 0 deletions docs/snippets/batch/gettingStartedKinesis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
KinesisStreamEvent,
KinesisStreamRecord,
Context,
KinesisStreamBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();

const recordHandler = (record: KinesisStreamRecord): void => {
logger.info('Processing record', { record: record.kinesis.data });
const payload = JSON.parse(record.kinesis.data);
logger.info('Processed item', { item: payload });
};

export const handler = async (
event: KinesisStreamEvent,
context: Context
): Promise<KinesisStreamBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
33 changes: 33 additions & 0 deletions docs/snippets/batch/gettingStartedSQS.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
SQSEvent,
SQSRecord,
Context,
SQSBatchResponse,
} from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
logger.info('Processed item', { item });
}
};

export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
export { processor };
Loading

0 comments on commit c4e6b19

Please sign in to comment.