Skip to content

Commit

Permalink
feat(batch): Implement SQS FIFO processor class (#1606)
Browse files Browse the repository at this point in the history
* Added SQS FIFO processor and unit tests

* Added docstring for pbatch processing function
  • Loading branch information
erikayao93 authored Jul 13, 2023
1 parent cf499e1 commit 691917d
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 0 deletions.
67 changes: 67 additions & 0 deletions packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { BatchProcessor, EventType, FailureResponse, SuccessResponse } from '.';

/**
* Process native partial responses from SQS FIFO queues
* Stops processing records when the first record fails
* The remaining records are reported as failed items
*/
class SqsFifoPartialProcessor extends BatchProcessor {
public constructor() {
super(EventType.SQS);
}

/**
* Call instance's handler for each record.
* When the first failed message is detected, the process is short-circuited
* And the remaining messages are reported as failed items
* TODO: change to synchronous execution if possible
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
let currentIndex = 0;
for (const record of this.records) {
// If we have any failed messages, it means the last message failed
// We should then short circuit the process and fail remaining messages
if (this.failureMessages.length != 0) {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

processedRecords.push(await this.processRecord(record));
currentIndex++;
}

this.clean();

return processedRecords;
}

/**
* Starting from the first failure index, fail all remaining messages and append them to the result list
* @param firstFailureIndex Index of first message that failed
* @param result List of success and failure responses with remaining messages failed
*/
public shortCircuitProcessing(
firstFailureIndex: number,
processedRecords: (SuccessResponse | FailureResponse)[]
): (SuccessResponse | FailureResponse)[] {
const remainingRecords = this.records.slice(firstFailureIndex);

for (const record of remainingRecords) {
const data = this.toBatchType(record, this.eventType);
processedRecords.push(
this.failureHandler(
data,
new Error('A previous record failed processing')
)
);
}

this.clean();

return processedRecords;
}
}

export { SqsFifoPartialProcessor };
1 change: 1 addition & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './BasePartialProcessor';
export * from './BasePartialBatchProcessor';
export * from './BatchProcessor';
export * from './processPartialResponse';
export * from './SqsFifoPartialProcessor';
7 changes: 7 additions & 0 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import {
PartialItemFailureResponse,
} from '.';

/**
* Higher level function to handle batch event processing
* @param event Lambda's original event
* @param recordHandler Callable function to process each record from the batch
* @param processor Batch processor to handle partial failure cases
* @returns Lambda Partial Batch Response
*/
const processPartialResponse = async (
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
Expand Down
116 changes: 116 additions & 0 deletions packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Test SqsFifoBatchProcessor class
*
* @group unit/batch/class/sqsfifobatchprocessor
*/

import { SqsFifoPartialProcessor, processPartialResponse } from '../../src';
import { sqsRecordFactory } from '../../tests/helpers/factories';
import {
asyncSqsRecordHandler,
sqsRecordHandler,
} from '../../tests/helpers/handlers';

describe('Class: SqsFifoBatchProcessor', () => {
const ENVIRONMENT_VARIABLES = process.env;

beforeEach(() => {
jest.clearAllMocks();
jest.resetModules();
process.env = { ...ENVIRONMENT_VARIABLES };
});

afterAll(() => {
process.env = ENVIRONMENT_VARIABLES;
});

describe('Synchronous SQS FIFO batch processing', () => {
test('SQS FIFO Batch processor with no failures', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = await processPartialResponse(
event,
sqsRecordHandler,
processor
);

// Assess
expect(result['batchItemFailures']).toStrictEqual([]);
});

test('SQS FIFO Batch processor with failures', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('fail');
const thirdRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord, thirdRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = await processPartialResponse(
event,
sqsRecordHandler,
processor
);

// Assess
expect(result['batchItemFailures'].length).toBe(2);
expect(result['batchItemFailures'][0]['itemIdentifier']).toBe(
secondRecord.messageId
);
expect(result['batchItemFailures'][1]['itemIdentifier']).toBe(
thirdRecord.messageId
);
});
});

describe('Asynchronous SQS FIFO batch processing', () => {
test('SQS FIFO Batch processor with no failures', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = await processPartialResponse(
event,
asyncSqsRecordHandler,
processor
);

// Assess
expect(result['batchItemFailures']).toStrictEqual([]);
});

test('SQS FIFO Batch processor with failures', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('fail');
const thirdRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord, thirdRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = await processPartialResponse(
event,
asyncSqsRecordHandler,
processor
);

// Assess
expect(result['batchItemFailures'].length).toBe(2);
expect(result['batchItemFailures'][0]['itemIdentifier']).toBe(
secondRecord.messageId
);
expect(result['batchItemFailures'][1]['itemIdentifier']).toBe(
thirdRecord.messageId
);
});
});
});

0 comments on commit 691917d

Please sign in to comment.