From e31279af90446050a7974fbe25c34758f64915f9 Mon Sep 17 00:00:00 2001 From: Asifur Rahman Arnab Date: Fri, 27 Sep 2024 20:41:20 +0600 Subject: [PATCH] feat(batch): sequential async processing of records for `BatchProcessor` (#3109) Co-authored-by: Andrea Amorosi --- docs/utilities/batch.md | 15 +- .../batch/sequentialAsyncProcessing.ts | 18 + packages/batch/src/BasePartialProcessor.ts | 40 +- packages/batch/src/types.ts | 7 + .../batch/tests/unit/BatchProcessor.test.ts | 353 ++++++------ .../tests/unit/processPartialResponse.test.ts | 542 +++++++++--------- 6 files changed, 533 insertions(+), 442 deletions(-) create mode 100644 examples/snippets/batch/sequentialAsyncProcessing.ts diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index e49ab6b98e..ec0e4ba108 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -416,7 +416,10 @@ For such cases we recommend to use the `BatchProcessorSync` and `processPartialR *If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse` * If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync` -The difference between the two processors in implementation is that `BatchProcessor` uses `Promise.all()` while `BatchProcessorSync` loops through each record to preserve the order. +The difference between the two processors is in how they handle record processing: + +* **`BatchProcessor`**: By default, it processes records in parallel using `Promise.all()`. However, it also offers an [option](#sequential-async-processing) to process records sequentially, preserving the order. +* **`BatchProcessorSync`**: Always processes records sequentially, ensuring the order is preserved by looping through each record one by one. ???+ question "When is this useful?" @@ -477,6 +480,16 @@ Let's suppose you'd like to add a metric named `BatchRecordFailures` for each ba --8<-- "examples/snippets/batch/extendingFailure.ts" ``` +### Sequential async processing + +By default, the `BatchProcessor` processes records in parallel using `Promise.all()`. However, if you need to preserve the order of records, you can set the `processInParallel` option to `false` to process records sequentially. + +!!! important "If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel." + +```typescript hl_lines="8 17" title="Sequential async processing" +--8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts" +``` + ### Create your own partial processor You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `processRecordSync()` abstract methods. diff --git a/examples/snippets/batch/sequentialAsyncProcessing.ts b/examples/snippets/batch/sequentialAsyncProcessing.ts new file mode 100644 index 0000000000..10ae24262e --- /dev/null +++ b/examples/snippets/batch/sequentialAsyncProcessing.ts @@ -0,0 +1,18 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import type { SQSHandler, SQSRecord } from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); + +const recordHandler = async (_record: SQSRecord): Promise => { + // Process the record +}; + +export const handler: SQSHandler = async (event, context) => + processPartialResponse(event, recordHandler, processor, { + context, + processInParallel: false, + }); diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 2e7e9bf8ac..68bcac467d 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -98,7 +98,7 @@ abstract class BasePartialProcessor { public abstract prepare(): void; /** - * Process all records with an asyncronous handler + * Process all records with an asynchronous handler * * Once called, the processor will create an array of promises to process each record * and wait for all of them to settle before returning the results. @@ -122,11 +122,11 @@ abstract class BasePartialProcessor { } this.prepare(); - const processingPromises: Promise[] = - this.records.map((record) => this.processRecord(record)); - - const processedRecords: (SuccessResponse | FailureResponse)[] = - await Promise.all(processingPromises); + // Default to `true` if `processInParallel` is not specified. + const processInParallel = this.options?.processInParallel ?? true; + const processedRecords = processInParallel + ? await this.#processRecordsInParallel() + : await this.#processRecordsSequentially(); this.clean(); @@ -134,9 +134,9 @@ abstract class BasePartialProcessor { } /** - * Process a record with an asyncronous handler + * Process a record with an asynchronous handler * - * An implementation of this method is required for asyncronous processors. + * An implementation of this method is required for asynchronous processors. * * When implementing this method, you should at least call the successHandler method * when a record succeeds processing and the failureHandler method when a record @@ -249,6 +249,30 @@ abstract class BasePartialProcessor { return entry; } + + /** + * Processes records in parallel using `Promise.all`. + */ + async #processRecordsInParallel(): Promise< + (SuccessResponse | FailureResponse)[] + > { + return Promise.all( + this.records.map((record) => this.processRecord(record)) + ); + } + + /** + * Processes records sequentially, ensuring that each record is processed one after the other. + */ + async #processRecordsSequentially(): Promise< + (SuccessResponse | FailureResponse)[] + > { + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + for (const record of this.records) { + processedRecords.push(await this.processRecord(record)); + } + return processedRecords; + } } export { BasePartialProcessor }; diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 7e60704535..3748c03e95 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -14,6 +14,7 @@ import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; * @property context The context object provided by the AWS Lambda runtime * @property skipGroupOnError The option to group on error during processing * @property throwOnFullBatchFailure The option to throw an error if the entire batch fails + * @property processInParallel Indicates whether the records should be processed in parallel */ type BatchProcessingOptions = { /** @@ -30,6 +31,12 @@ type BatchProcessingOptions = { * Set this to false to prevent throwing an error if the entire batch fails. */ throwOnFullBatchFailure?: boolean; + /** + * Indicates whether the records should be processed in parallel. + * When set to `true`, the records will be processed in parallel using `Promise.all`. + * When set to `false`, the records will be processed sequentially. + */ + processInParallel?: T extends SqsFifoPartialProcessor ? never : boolean; }; /** diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index e49ee4c30f..650354d968 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -35,198 +35,211 @@ describe('Class: AsyncBatchProcessor', () => { process.env = ENVIRONMENT_VARIABLES; }); - describe('Asynchronously processing SQS Records', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - - it('completes processing with with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('success'); - const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.body, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.messageId }, - { itemIdentifier: thirdRecord.messageId }, - ], + describe('Asynchronously processing', () => { + const cases = [ + { + description: 'in parallel', + options: { processInParallel: true }, + }, + { + description: 'sequentially', + options: { processInParallel: false }, + }, + ]; + + describe.each(cases)('SQS Records $description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); }); - }); - - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('failure'); - const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); - - describe('Asynchronously processing Kinesis Records', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('success'); - const secondRecord = kinesisRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.process(); + it('completes processing with with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.body, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.messageId }, + { itemIdentifier: thirdRecord.messageId }, + ], + }); + }); - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.kinesis.data, firstRecord], - ['success', secondRecord.kinesis.data, secondRecord], - ]); - }); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('success'); - const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); - // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.process(); + // Act + processor.register(records, asyncSqsRecordHandler, options); - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.kinesis.data, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.kinesis.sequenceNumber }, - { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, - ], + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('failure'); - const thirdRecord = kinesisRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); + describe.each(cases)('Kinesis Records $description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.kinesis.data, firstRecord], + ['success', secondRecord.kinesis.data, secondRecord], + ]); + }); - // Act - processor.register(records, asyncKinesisRecordHandler); + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.kinesis.data, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.kinesis.sequenceNumber }, + { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, + ], + }); + }); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); - describe('Asynchronously processing DynamoDB Records', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('success'); - const secondRecord = dynamodbRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); - // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.process(); + // Act + processor.register(records, asyncKinesisRecordHandler, options); - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], - ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], - ]); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); }); - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('success'); - const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.process(); + describe.each(cases)('DynamoDB Records $description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], + ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], + ]); + }); - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.dynamodb?.NewImage?.Message, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, - { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, - ], + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, + { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, + ], + }); }); - }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('failure'); - const thirdRecord = dynamodbRecordFactory('fail'); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); - // Act - processor.register(records, asyncDynamodbRecordHandler); + // Act + processor.register(records, asyncDynamodbRecordHandler, options); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); }); }); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 2b2ec185ad..54d0ef6fec 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -36,6 +36,62 @@ describe('Function: processPartialResponse()', () => { context, }; + const handlerWithSqsEvent = async ( + event: SQSEvent, + options: BatchProcessingOptions + ) => { + const processor = new BatchProcessor(EventType.SQS); + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => + processPartialResponse(event, asyncSqsRecordHandler, processor, options); + + return handler(event, context); + }; + + const handlerWithKinesisEvent = async ( + event: KinesisStreamEvent, + options: BatchProcessingOptions + ) => { + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + const handler = async ( + event: KinesisStreamEvent, + _context: Context + ): Promise => + processPartialResponse( + event, + asyncKinesisRecordHandler, + processor, + options + ); + + return handler(event, context); + }; + + const handlerWithDynamoDBEvent = async ( + event: DynamoDBStreamEvent, + options: BatchProcessingOptions + ) => { + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + const handler = async ( + event: DynamoDBStreamEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + asyncDynamodbRecordHandler, + processor, + options + ); + }; + + return handler(event, context); + }; + beforeEach(() => { vi.clearAllMocks(); process.env = { ...ENVIRONMENT_VARIABLES }; @@ -46,304 +102,264 @@ describe('Function: processPartialResponse()', () => { }); describe('Process partial response function call tests', () => { - it('Process partial response function call with asynchronous handler', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); - - // Act - const ret = await processPartialResponse( - batch, - asyncSqsRecordHandler, - processor - ); + const cases = [ + { + description: 'in parallel', + processingOptions: { processInParallel: true }, + }, + { + description: 'sequentially', + processingOptions: { processInParallel: false }, + }, + ]; + + describe.each(cases)('$description', ({ processingOptions }) => { + it('Process partial response function call with asynchronous handler', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); + // Act + const ret = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor, + processingOptions + ); - it('Process partial response function call with context provided', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); - - // Act - const ret = await processPartialResponse( - batch, - asyncHandlerWithContext, - processor, - options - ); + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); + it('Process partial response function call with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); - it('Process partial response function call with asynchronous handler for full batch failure', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); + // Act + const ret = await processPartialResponse( + batch, + asyncHandlerWithContext, + processor, + { + ...processingOptions, + ...options, + } + ); - // Act & Assess - await expect( - processPartialResponse(batch, asyncSqsRecordHandler, processor) - ).rejects.toThrow(FullBatchFailureError); - }); + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); - it('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); + it('Process partial response function call with asynchronous handler for full batch failure', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect( + processPartialResponse( + batch, + asyncSqsRecordHandler, + processor, + processingOptions + ) + ).rejects.toThrow(FullBatchFailureError); + }); - // Act & Assess - await expect( - processPartialResponse(batch, asyncSqsRecordHandler, processor, { - ...options, - throwOnFullBatchFailure: true, - }) - ).rejects.toThrow(FullBatchFailureError); - }); + it('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect( + processPartialResponse(batch, asyncSqsRecordHandler, processor, { + ...processingOptions, + ...options, + throwOnFullBatchFailure: true, + }) + ).rejects.toThrow(FullBatchFailureError); + }); - it('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); + it('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); - // Act - const response = await processPartialResponse( - batch, - asyncSqsRecordHandler, - processor, - { - ...options, - throwOnFullBatchFailure: false, - } - ); + // Act + const response = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor, + { + ...processingOptions, + ...options, + throwOnFullBatchFailure: false, + } + ); - // Assess - expect(response).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: records[0].messageId }, - { itemIdentifier: records[1].messageId }, - ], + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); }); }); }); describe('Process partial response function call through handler', () => { - it('Process partial response through handler with SQS event', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse(event, asyncSqsRecordHandler, processor); - }; - - // Act - const result = await handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - - it('Process partial response through handler with Kinesis event', async () => { - // Prepare - const records = [ - kinesisRecordFactory('success'), - kinesisRecordFactory('success'), - ]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - const event: KinesisStreamEvent = { Records: records }; - - const handler = async ( - event: KinesisStreamEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncKinesisRecordHandler, - processor - ); - }; - - // Act - const result = await handler(event, context); + const cases = [ + { + description: 'in parallel', + processingOptions: { processInParallel: true }, + }, + { + description: 'sequentially', + processingOptions: { processInParallel: false }, + }, + ]; + + describe.each(cases)('$description', ({ processingOptions }) => { + it('Process partial response through handler with SQS event', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const event: SQSEvent = { Records: records }; - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); + // Act + const result = await handlerWithSqsEvent(event, processingOptions); - it('Process partial response through handler with DynamoDB event', async () => { - // Prepare - const records = [ - dynamodbRecordFactory('success'), - dynamodbRecordFactory('success'), - ]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - const event: DynamoDBStreamEvent = { Records: records }; - - const handler = async ( - event: DynamoDBStreamEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncDynamodbRecordHandler, - processor - ); - }; + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); - // Act - const result = await handler(event, context); + it('Process partial response through handler with Kinesis event', async () => { + // Prepare + const records = [ + kinesisRecordFactory('success'), + kinesisRecordFactory('success'), + ]; + const event: KinesisStreamEvent = { Records: records }; - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); + // Act + const result = await handlerWithKinesisEvent(event, processingOptions); - it('Process partial response through handler for SQS records with incorrect event type', async () => { - // Prepare - const processor = new BatchProcessor(EventType.SQS); + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncSqsRecordHandler, - processor - ); - }; + it('Process partial response through handler with DynamoDB event', async () => { + // Prepare + const records = [ + dynamodbRecordFactory('success'), + dynamodbRecordFactory('success'), + ]; + const event: DynamoDBStreamEvent = { Records: records }; - try { // Act - await handler({} as unknown as SQSEvent, context); - } catch (error) { + const result = await handlerWithDynamoDBEvent(event, processingOptions); + // Assess - assert(error instanceof UnexpectedBatchTypeError); - expect(error.message).toBe( - `Unexpected batch type. Possible values are: ${Object.keys( - EventType - ).join(', ')}` - ); - } - }); + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); - it('Process partial response through handler with context provided', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - context: Context - ): Promise => { - const options: BatchProcessingOptions = { context: context }; - - return await processPartialResponse( - event, - asyncHandlerWithContext, - processor, - options - ); - }; + it('Process partial response through handler for SQS records with incorrect event type', async () => { + try { + // Act + await handlerWithSqsEvent( + {} as unknown as SQSEvent, + processingOptions + ); + } catch (error) { + // Assess + assert(error instanceof UnexpectedBatchTypeError); + expect(error.message).toBe( + `Unexpected batch type. Possible values are: ${Object.keys( + EventType + ).join(', ')}` + ); + } + }); - // Act - const result = await handler(event, context); + it('Process partial response through handler with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const event: SQSEvent = { Records: records }; - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); + // Act + const result = await handlerWithSqsEvent(event, { + context, + ...processingOptions, + }); - it('Process partial response through handler for full batch failure', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse(event, asyncSqsRecordHandler, processor); - }; - - // Act & Assess - await expect(handler(event, context)).rejects.toThrow( - FullBatchFailureError - ); - }); + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); - it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse(event, asyncSqsRecordHandler, processor, { - ...options, - throwOnFullBatchFailure: true, - }); - }; + it('Process partial response through handler for full batch failure', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const event: SQSEvent = { Records: records }; - // Act & Assess - await expect(handler(event, context)).rejects.toThrow( - FullBatchFailureError - ); - }); + // Act & Assess + await expect( + handlerWithSqsEvent(event, processingOptions) + ).rejects.toThrow(FullBatchFailureError); + }); + + it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const event: SQSEvent = { Records: records }; + + // Act & Assess + await expect( + handlerWithSqsEvent(event, { + ...options, + ...processingOptions, + throwOnFullBatchFailure: true, + }) + ).rejects.toThrow(FullBatchFailureError); + }); - it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse(event, asyncSqsRecordHandler, processor, { + it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const event: SQSEvent = { Records: records }; + + // Act + const response = await handlerWithSqsEvent(event, { ...options, + ...processingOptions, throwOnFullBatchFailure: false, }); - }; - - // Act - const response = await handler(event, context); - // Assess - expect(response).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: records[0].messageId }, - { itemIdentifier: records[1].messageId }, - ], + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); }); }); });