Skip to content

Commit

Permalink
feat(batch): Batch processing wrapper function (#1605)
Browse files Browse the repository at this point in the history
* Refactored some types, added function wrapper and base test

* Added record check and tests, renamed factories

* Refactored type check logic in function

* Refactor test to remove error ignore
  • Loading branch information
erikayao93 authored Jul 13, 2023
1 parent 9d8df3d commit cf499e1
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 75 deletions.
24 changes: 12 additions & 12 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import {
DEFAULT_RESPONSE,
EventSourceDataClassTypes,
EventType,
ItemIdentifier,
BatchResponse,
PartialItemFailures,
PartialItemFailureResponse,
} from '.';

abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public COLLECTOR_MAPPING;

public batchResponse: BatchResponse;
public batchResponse: PartialItemFailureResponse;

public eventType: keyof typeof EventType;

Expand Down Expand Up @@ -52,16 +52,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
);
}

const messages: ItemIdentifier[] = this.getMessagesToReport();
const messages: PartialItemFailures[] = this.getMessagesToReport();
this.batchResponse = { batchItemFailures: messages };
}

/**
* Collects identifiers of failed items for a DynamoDB stream
* @returns list of identifiers for failed items
*/
public collectDynamoDBFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];
public collectDynamoDBFailures(): PartialItemFailures[] {
const failures: PartialItemFailures[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber;
Expand All @@ -77,8 +77,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
* Collects identifiers of failed items for a Kinesis stream
* @returns list of identifiers for failed items
*/
public collectKinesisFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];
public collectKinesisFailures(): PartialItemFailures[] {
const failures: PartialItemFailures[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber;
Expand All @@ -92,8 +92,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
* Collects identifiers of failed items for an SQS batch
* @returns list of identifiers for failed items
*/
public collectSqsFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];
public collectSqsFailures(): PartialItemFailures[] {
const failures: PartialItemFailures[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as SQSRecord).messageId;
Expand All @@ -115,7 +115,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
* Collects identifiers for failed batch items
* @returns formatted messages to use in batch deletion
*/
public getMessagesToReport(): ItemIdentifier[] {
public getMessagesToReport(): PartialItemFailures[] {
return this.COLLECTOR_MAPPING[this.eventType]();
}

Expand Down Expand Up @@ -146,7 +146,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
/**
* @returns Batch items that failed processing, if any
*/
public response(): BatchResponse {
public response(): PartialItemFailureResponse {
return this.batchResponse;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/batch/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
* Constants for batch processor classes
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import type { BatchResponse, EventSourceDataClassTypes } from '.';
import type { PartialItemFailureResponse, EventSourceDataClassTypes } from '.';

const EventType = {
SQS: 'SQS',
KinesisDataStreams: 'KinesisDataStreams',
DynamoDBStreams: 'DynamoDBStreams',
} as const;

const DEFAULT_RESPONSE: BatchResponse = {
const DEFAULT_RESPONSE: PartialItemFailureResponse = {
batchItemFailures: [],
};

Expand Down
2 changes: 2 additions & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ export * from './errors';
export * from './types';
export * from './BasePartialProcessor';
export * from './BasePartialBatchProcessor';
export * from './BatchProcessor';
export * from './processPartialResponse';
30 changes: 30 additions & 0 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import {
BasePartialBatchProcessor,
BaseRecord,
EventType,
PartialItemFailureResponse,
} from '.';

const processPartialResponse = async (
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor
): Promise<PartialItemFailureResponse> => {
if (!event.Records) {
const eventTypes: string = Object.values(EventType).toString();
throw new Error(
'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' +
eventTypes +
' event.'
);
}

const records = event['Records'];

processor.register(records, recordHandler);
await processor.process();

return processor.response();
};

export { processPartialResponse };
8 changes: 4 additions & 4 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ type SuccessResponse = [string, ResultType, EventSourceDataClassTypes];

type FailureResponse = [string, string, EventSourceDataClassTypes];

type ItemIdentifier = { [key: string]: string };
type BatchResponse = { [key: string]: ItemIdentifier[] };
type PartialItemFailures = { itemIdentifier: string };
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };

export type {
BaseRecord,
EventSourceDataClassTypes,
ResultType,
SuccessResponse,
FailureResponse,
ItemIdentifier,
BatchResponse,
PartialItemFailures,
PartialItemFailureResponse,
};
8 changes: 4 additions & 4 deletions packages/batch/tests/helpers/factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import { randomInt } from 'crypto';
import { v4 } from 'uuid';

const sqsEventFactory = (body: string): SQSRecord => {
const sqsRecordFactory = (body: string): SQSRecord => {
return {
messageId: v4(),
receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a',
Expand All @@ -21,7 +21,7 @@ const sqsEventFactory = (body: string): SQSRecord => {
};
};

const kinesisEventFactory = (body: string): KinesisStreamRecord => {
const kinesisRecordFactory = (body: string): KinesisStreamRecord => {
let seq = '';
for (let i = 0; i < 52; i++) {
seq = seq + randomInt(10);
Expand All @@ -46,7 +46,7 @@ const kinesisEventFactory = (body: string): KinesisStreamRecord => {
};
};

const dynamodbEventFactory = (body: string): DynamoDBRecord => {
const dynamodbRecordFactory = (body: string): DynamoDBRecord => {
let seq = '';
for (let i = 0; i < 10; i++) {
seq = seq + randomInt(10);
Expand All @@ -69,4 +69,4 @@ const dynamodbEventFactory = (body: string): DynamoDBRecord => {
};
};

export { sqsEventFactory, kinesisEventFactory, dynamodbEventFactory };
export { sqsRecordFactory, kinesisRecordFactory, dynamodbRecordFactory };
Loading

0 comments on commit cf499e1

Please sign in to comment.