diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts new file mode 100644 index 0000000000..703476c6c5 --- /dev/null +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -0,0 +1,67 @@ +import { BatchProcessor, EventType, FailureResponse, SuccessResponse } from '.'; + +/** + * Process native partial responses from SQS FIFO queues + * Stops processing records when the first record fails + * The remaining records are reported as failed items + */ +class SqsFifoPartialProcessor extends BatchProcessor { + public constructor() { + super(EventType.SQS); + } + + /** + * Call instance's handler for each record. + * When the first failed message is detected, the process is short-circuited + * And the remaining messages are reported as failed items + * TODO: change to synchronous execution if possible + */ + public async process(): Promise<(SuccessResponse | FailureResponse)[]> { + this.prepare(); + + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + let currentIndex = 0; + for (const record of this.records) { + // If we have any failed messages, it means the last message failed + // We should then short circuit the process and fail remaining messages + if (this.failureMessages.length != 0) { + return this.shortCircuitProcessing(currentIndex, processedRecords); + } + + processedRecords.push(await this.processRecord(record)); + currentIndex++; + } + + this.clean(); + + return processedRecords; + } + + /** + * Starting from the first failure index, fail all remaining messages and append them to the result list + * @param firstFailureIndex Index of first message that failed + * @param result List of success and failure responses with remaining messages failed + */ + public shortCircuitProcessing( + firstFailureIndex: number, + processedRecords: (SuccessResponse | FailureResponse)[] + ): (SuccessResponse | FailureResponse)[] { + const remainingRecords = this.records.slice(firstFailureIndex); + + for (const record of remainingRecords) { + const data = this.toBatchType(record, this.eventType); + processedRecords.push( + this.failureHandler( + data, + new Error('A previous record failed processing') + ) + ); + } + + this.clean(); + + return processedRecords; + } +} + +export { SqsFifoPartialProcessor }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index d18b197b9c..82abac172a 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -5,3 +5,4 @@ export * from './BasePartialProcessor'; export * from './BasePartialBatchProcessor'; export * from './BatchProcessor'; export * from './processPartialResponse'; +export * from './SqsFifoPartialProcessor'; diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index 4c700d3e79..a29a01e903 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -5,6 +5,13 @@ import { PartialItemFailureResponse, } from '.'; +/** + * Higher level function to handle batch event processing + * @param event Lambda's original event + * @param recordHandler Callable function to process each record from the batch + * @param processor Batch processor to handle partial failure cases + * @returns Lambda Partial Batch Response + */ const processPartialResponse = async ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts new file mode 100644 index 0000000000..b49e3fbdf1 --- /dev/null +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -0,0 +1,116 @@ +/** + * Test SqsFifoBatchProcessor class + * + * @group unit/batch/class/sqsfifobatchprocessor + */ + +import { SqsFifoPartialProcessor, processPartialResponse } from '../../src'; +import { sqsRecordFactory } from '../../tests/helpers/factories'; +import { + asyncSqsRecordHandler, + sqsRecordHandler, +} from '../../tests/helpers/handlers'; + +describe('Class: SqsFifoBatchProcessor', () => { + const ENVIRONMENT_VARIABLES = process.env; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Synchronous SQS FIFO batch processing', () => { + test('SQS FIFO Batch processor with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord] }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = await processPartialResponse( + event, + sqsRecordHandler, + processor + ); + + // Assess + expect(result['batchItemFailures']).toStrictEqual([]); + }); + + test('SQS FIFO Batch processor with failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('fail'); + const thirdRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord, thirdRecord] }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = await processPartialResponse( + event, + sqsRecordHandler, + processor + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(2); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + secondRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( + thirdRecord.messageId + ); + }); + }); + + describe('Asynchronous SQS FIFO batch processing', () => { + test('SQS FIFO Batch processor with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord] }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = await processPartialResponse( + event, + asyncSqsRecordHandler, + processor + ); + + // Assess + expect(result['batchItemFailures']).toStrictEqual([]); + }); + + test('SQS FIFO Batch processor with failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('fail'); + const thirdRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord, thirdRecord] }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = await processPartialResponse( + event, + asyncSqsRecordHandler, + processor + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(2); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + secondRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( + thirdRecord.messageId + ); + }); + }); +});