diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index fac2e253dc..806331d6fc 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -9,14 +9,14 @@ import { DEFAULT_RESPONSE, EventSourceDataClassTypes, EventType, - ItemIdentifier, - BatchResponse, + PartialItemFailures, + PartialItemFailureResponse, } from '.'; abstract class BasePartialBatchProcessor extends BasePartialProcessor { public COLLECTOR_MAPPING; - public batchResponse: BatchResponse; + public batchResponse: PartialItemFailureResponse; public eventType: keyof typeof EventType; @@ -52,7 +52,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { ); } - const messages: ItemIdentifier[] = this.getMessagesToReport(); + const messages: PartialItemFailures[] = this.getMessagesToReport(); this.batchResponse = { batchItemFailures: messages }; } @@ -60,8 +60,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { * Collects identifiers of failed items for a DynamoDB stream * @returns list of identifiers for failed items */ - public collectDynamoDBFailures(): ItemIdentifier[] { - const failures: ItemIdentifier[] = []; + public collectDynamoDBFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; for (const msg of this.failureMessages) { const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber; @@ -77,8 +77,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { * Collects identifiers of failed items for a Kinesis stream * @returns list of identifiers for failed items */ - public collectKinesisFailures(): ItemIdentifier[] { - const failures: ItemIdentifier[] = []; + public collectKinesisFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; for (const msg of this.failureMessages) { const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber; @@ -92,8 +92,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { * Collects identifiers of failed items for an SQS batch * @returns list of identifiers for failed items */ - public collectSqsFailures(): ItemIdentifier[] { - const failures: ItemIdentifier[] = []; + public collectSqsFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; for (const msg of this.failureMessages) { const msgId = (msg as SQSRecord).messageId; @@ -115,7 +115,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { * Collects identifiers for failed batch items * @returns formatted messages to use in batch deletion */ - public getMessagesToReport(): ItemIdentifier[] { + public getMessagesToReport(): PartialItemFailures[] { return this.COLLECTOR_MAPPING[this.eventType](); } @@ -146,7 +146,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { /** * @returns Batch items that failed processing, if any */ - public response(): BatchResponse { + public response(): PartialItemFailureResponse { return this.batchResponse; } diff --git a/packages/batch/src/constants.ts b/packages/batch/src/constants.ts index f7be1aa447..b707a79ca5 100644 --- a/packages/batch/src/constants.ts +++ b/packages/batch/src/constants.ts @@ -2,7 +2,7 @@ * Constants for batch processor classes */ import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; -import type { BatchResponse, EventSourceDataClassTypes } from '.'; +import type { PartialItemFailureResponse, EventSourceDataClassTypes } from '.'; const EventType = { SQS: 'SQS', @@ -10,7 +10,7 @@ const EventType = { DynamoDBStreams: 'DynamoDBStreams', } as const; -const DEFAULT_RESPONSE: BatchResponse = { +const DEFAULT_RESPONSE: PartialItemFailureResponse = { batchItemFailures: [], }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index abb05d6952..d18b197b9c 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -3,3 +3,5 @@ export * from './errors'; export * from './types'; export * from './BasePartialProcessor'; export * from './BasePartialBatchProcessor'; +export * from './BatchProcessor'; +export * from './processPartialResponse'; diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts new file mode 100644 index 0000000000..4c700d3e79 --- /dev/null +++ b/packages/batch/src/processPartialResponse.ts @@ -0,0 +1,30 @@ +import { + BasePartialBatchProcessor, + BaseRecord, + EventType, + PartialItemFailureResponse, +} from '.'; + +const processPartialResponse = async ( + event: { Records: BaseRecord[] }, + recordHandler: CallableFunction, + processor: BasePartialBatchProcessor +): Promise => { + if (!event.Records) { + const eventTypes: string = Object.values(EventType).toString(); + throw new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ); + } + + const records = event['Records']; + + processor.register(records, recordHandler); + await processor.process(); + + return processor.response(); +}; + +export { processPartialResponse }; diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index a89129199d..38065a3d66 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -17,8 +17,8 @@ type SuccessResponse = [string, ResultType, EventSourceDataClassTypes]; type FailureResponse = [string, string, EventSourceDataClassTypes]; -type ItemIdentifier = { [key: string]: string }; -type BatchResponse = { [key: string]: ItemIdentifier[] }; +type PartialItemFailures = { itemIdentifier: string }; +type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; export type { BaseRecord, @@ -26,6 +26,6 @@ export type { ResultType, SuccessResponse, FailureResponse, - ItemIdentifier, - BatchResponse, + PartialItemFailures, + PartialItemFailureResponse, }; diff --git a/packages/batch/tests/helpers/factories.ts b/packages/batch/tests/helpers/factories.ts index 883983a849..b55e401474 100644 --- a/packages/batch/tests/helpers/factories.ts +++ b/packages/batch/tests/helpers/factories.ts @@ -2,7 +2,7 @@ import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; import { randomInt } from 'crypto'; import { v4 } from 'uuid'; -const sqsEventFactory = (body: string): SQSRecord => { +const sqsRecordFactory = (body: string): SQSRecord => { return { messageId: v4(), receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', @@ -21,7 +21,7 @@ const sqsEventFactory = (body: string): SQSRecord => { }; }; -const kinesisEventFactory = (body: string): KinesisStreamRecord => { +const kinesisRecordFactory = (body: string): KinesisStreamRecord => { let seq = ''; for (let i = 0; i < 52; i++) { seq = seq + randomInt(10); @@ -46,7 +46,7 @@ const kinesisEventFactory = (body: string): KinesisStreamRecord => { }; }; -const dynamodbEventFactory = (body: string): DynamoDBRecord => { +const dynamodbRecordFactory = (body: string): DynamoDBRecord => { let seq = ''; for (let i = 0; i < 10; i++) { seq = seq + randomInt(10); @@ -69,4 +69,4 @@ const dynamodbEventFactory = (body: string): DynamoDBRecord => { }; }; -export { sqsEventFactory, kinesisEventFactory, dynamodbEventFactory }; +export { sqsRecordFactory, kinesisRecordFactory, dynamodbRecordFactory }; diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index d8989b45d6..4080196af5 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -6,9 +6,9 @@ import { BatchProcessingError, BatchProcessor, EventType } from '../../src'; import { - sqsEventFactory, - kinesisEventFactory, - dynamodbEventFactory, + sqsRecordFactory, + kinesisRecordFactory, + dynamodbRecordFactory, } from '../../tests/helpers/factories'; import { sqsRecordHandler, @@ -35,8 +35,8 @@ describe('Class: BatchProcessor', () => { describe('Synchronously processing SQS Records', () => { test('Batch processing SQS records with no failures', async () => { // Prepare - const firstRecord = sqsEventFactory('success'); - const secondRecord = sqsEventFactory('success'); + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); const records = [firstRecord, secondRecord]; const processor = new BatchProcessor(EventType.SQS); @@ -53,9 +53,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing SQS records with some failures', async () => { // Prepare - const firstRecord = sqsEventFactory('failure'); - const secondRecord = sqsEventFactory('success'); - const thirdRecord = sqsEventFactory('fail'); + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.SQS); @@ -80,9 +80,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing SQS records with all failures', async () => { // Prepare - const firstRecord = sqsEventFactory('failure'); - const secondRecord = sqsEventFactory('failure'); - const thirdRecord = sqsEventFactory('fail'); + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.SQS); @@ -98,8 +98,8 @@ describe('Class: BatchProcessor', () => { describe('Asynchronously processing SQS Records', () => { test('Batch processing SQS records with no failures', async () => { // Prepare - const firstRecord = sqsEventFactory('success'); - const secondRecord = sqsEventFactory('success'); + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); const records = [firstRecord, secondRecord]; const processor = new BatchProcessor(EventType.SQS); @@ -116,9 +116,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing SQS records with some failures', async () => { // Prepare - const firstRecord = sqsEventFactory('failure'); - const secondRecord = sqsEventFactory('success'); - const thirdRecord = sqsEventFactory('fail'); + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.SQS); @@ -143,9 +143,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing SQS records with all failures', async () => { // Prepare - const firstRecord = sqsEventFactory('failure'); - const secondRecord = sqsEventFactory('failure'); - const thirdRecord = sqsEventFactory('fail'); + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.SQS); @@ -163,8 +163,8 @@ describe('Class: BatchProcessor', () => { describe('Synchronously processing Kinesis Records', () => { test('Batch processing Kinesis records with no failures', async () => { // Prepare - const firstRecord = kinesisEventFactory('success'); - const secondRecord = kinesisEventFactory('success'); + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); const records = [firstRecord, secondRecord]; const processor = new BatchProcessor(EventType.KinesisDataStreams); @@ -181,9 +181,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing Kinesis records with some failures', async () => { // Prepare - const firstRecord = kinesisEventFactory('failure'); - const secondRecord = kinesisEventFactory('success'); - const thirdRecord = kinesisEventFactory('fail'); + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.KinesisDataStreams); @@ -207,9 +207,9 @@ describe('Class: BatchProcessor', () => { }); test('Batch processing Kinesis records with all failures', async () => { - const firstRecord = kinesisEventFactory('failure'); - const secondRecord = kinesisEventFactory('failure'); - const thirdRecord = kinesisEventFactory('fail'); + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.KinesisDataStreams); @@ -227,8 +227,8 @@ describe('Class: BatchProcessor', () => { describe('Asynchronously processing Kinesis Records', () => { test('Batch processing Kinesis records with no failures', async () => { // Prepare - const firstRecord = kinesisEventFactory('success'); - const secondRecord = kinesisEventFactory('success'); + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); const records = [firstRecord, secondRecord]; const processor = new BatchProcessor(EventType.KinesisDataStreams); @@ -245,9 +245,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing Kinesis records with some failures', async () => { // Prepare - const firstRecord = kinesisEventFactory('failure'); - const secondRecord = kinesisEventFactory('success'); - const thirdRecord = kinesisEventFactory('fail'); + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.KinesisDataStreams); @@ -272,9 +272,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing Kinesis records with all failures', async () => { // Prepare - const firstRecord = kinesisEventFactory('failure'); - const secondRecord = kinesisEventFactory('failure'); - const thirdRecord = kinesisEventFactory('fail'); + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.KinesisDataStreams); @@ -292,8 +292,8 @@ describe('Class: BatchProcessor', () => { describe('Synchronously processing DynamoDB Records', () => { test('Batch processing DynamoDB records with no failures', async () => { // Prepare - const firstRecord = dynamodbEventFactory('success'); - const secondRecord = dynamodbEventFactory('success'); + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); const records = [firstRecord, secondRecord]; const processor = new BatchProcessor(EventType.DynamoDBStreams); @@ -308,11 +308,11 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing DynamoDB records with failures', async () => { + test('Batch processing DynamoDB records with some failures', async () => { // Prepare - const firstRecord = dynamodbEventFactory('failure'); - const secondRecord = dynamodbEventFactory('success'); - const thirdRecord = dynamodbEventFactory('fail'); + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.DynamoDBStreams); @@ -337,9 +337,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing DynamoDB records with all failures', async () => { // Prepare - const firstRecord = dynamodbEventFactory('failure'); - const secondRecord = dynamodbEventFactory('failure'); - const thirdRecord = dynamodbEventFactory('fail'); + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.DynamoDBStreams); @@ -357,8 +357,8 @@ describe('Class: BatchProcessor', () => { describe('Asynchronously processing DynamoDB Records', () => { test('Batch processing DynamoDB records with no failures', async () => { // Prepare - const firstRecord = dynamodbEventFactory('success'); - const secondRecord = dynamodbEventFactory('success'); + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); const records = [firstRecord, secondRecord]; const processor = new BatchProcessor(EventType.DynamoDBStreams); @@ -373,11 +373,11 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing DynamoDB records with failures', async () => { + test('Batch processing DynamoDB records with some failures', async () => { // Prepare - const firstRecord = dynamodbEventFactory('failure'); - const secondRecord = dynamodbEventFactory('success'); - const thirdRecord = dynamodbEventFactory('fail'); + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.DynamoDBStreams); @@ -402,9 +402,9 @@ describe('Class: BatchProcessor', () => { test('Batch processing DynamoDB records with all failures', async () => { // Prepare - const firstRecord = dynamodbEventFactory('failure'); - const secondRecord = dynamodbEventFactory('failure'); - const thirdRecord = dynamodbEventFactory('fail'); + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.DynamoDBStreams); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts new file mode 100644 index 0000000000..ea70123b2c --- /dev/null +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -0,0 +1,192 @@ +/** + * Test processPartialResponse function + * + * @group unit/batch/function/processpartialresponse + */ + +import { + Context, + DynamoDBStreamEvent, + KinesisStreamEvent, + SQSEvent, +} from 'aws-lambda'; +import { + BatchProcessor, + EventType, + PartialItemFailureResponse, + processPartialResponse, +} from '../../src'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../../tests/helpers/factories'; +import { + asyncSqsRecordHandler, + dynamodbRecordHandler, + kinesisRecordHandler, + sqsRecordHandler, +} from '../../tests/helpers/handlers'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { Custom as dummyEvent } from '../../../commons/src/samples/resources/events'; + +describe('Function: processPartialResponse()', () => { + const ENVIRONMENT_VARIABLES = process.env; + const context = dummyContext; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Process partial response function call tests', () => { + test('Process partial response function call with synchronous handler', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const ret = await processPartialResponse( + batch, + sqsRecordHandler, + processor + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response function call with asynchronous handler', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const ret = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + }); + + describe('Process partial response function call through handler', () => { + test('Process partial response through handler with SQS event', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return await processPartialResponse(event, sqsRecordHandler, processor); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with Kinesis event', async () => { + // Prepare + const records = [ + kinesisRecordFactory('success'), + kinesisRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + const event: KinesisStreamEvent = { Records: records }; + + const handler = async ( + event: KinesisStreamEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + kinesisRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with DynamoDB event', async () => { + // Prepare + const records = [ + dynamodbRecordFactory('success'), + dynamodbRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + const event: DynamoDBStreamEvent = { Records: records }; + + const handler = async ( + event: DynamoDBStreamEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + dynamodbRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler for SQS records with incorrect event type', async () => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + const event = dummyEvent; + const eventTypes: string = Object.values(EventType).toString(); + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return await processPartialResponse(event, sqsRecordHandler, processor); + }; + + // Act & Assess + await expect( + handler(event as unknown as SQSEvent, context) + ).rejects.toThrowError( + new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ) + ); + }); + }); +});