Skip to content

Commit

Permalink
feat(batch): sequential async processing of records for `BatchProcess…
Browse files Browse the repository at this point in the history
…or` (#3109)

Co-authored-by: Andrea Amorosi <dreamorosi@gmail.com>
  • Loading branch information
arnabrahman and dreamorosi authored Sep 27, 2024
1 parent 39c8608 commit e31279a
Show file tree
Hide file tree
Showing 6 changed files with 533 additions and 442 deletions.
15 changes: 14 additions & 1 deletion docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,10 @@ For such cases we recommend to use the `BatchProcessorSync` and `processPartialR
*If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse`
* If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync`

The difference between the two processors in implementation is that `BatchProcessor` uses `Promise.all()` while `BatchProcessorSync` loops through each record to preserve the order.
The difference between the two processors is in how they handle record processing:

* **`BatchProcessor`**: By default, it processes records in parallel using `Promise.all()`. However, it also offers an [option](#sequential-async-processing) to process records sequentially, preserving the order.
* **`BatchProcessorSync`**: Always processes records sequentially, ensuring the order is preserved by looping through each record one by one.

???+ question "When is this useful?"

Expand Down Expand Up @@ -477,6 +480,16 @@ Let's suppose you'd like to add a metric named `BatchRecordFailures` for each ba
--8<-- "examples/snippets/batch/extendingFailure.ts"
```

### Sequential async processing

By default, the `BatchProcessor` processes records in parallel using `Promise.all()`. However, if you need to preserve the order of records, you can set the `processInParallel` option to `false` to process records sequentially.

!!! important "If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel."

```typescript hl_lines="8 17" title="Sequential async processing"
--8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts"
```

### Create your own partial processor

You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `processRecordSync()` abstract methods.
Expand Down
18 changes: 18 additions & 0 deletions examples/snippets/batch/sequentialAsyncProcessing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);

const recordHandler = async (_record: SQSRecord): Promise<void> => {
// Process the record
};

export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
processInParallel: false,
});
40 changes: 32 additions & 8 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ abstract class BasePartialProcessor {
public abstract prepare(): void;

/**
* Process all records with an asyncronous handler
* Process all records with an asynchronous handler
*
* Once called, the processor will create an array of promises to process each record
* and wait for all of them to settle before returning the results.
Expand All @@ -122,21 +122,21 @@ abstract class BasePartialProcessor {
}
this.prepare();

const processingPromises: Promise<SuccessResponse | FailureResponse>[] =
this.records.map((record) => this.processRecord(record));

const processedRecords: (SuccessResponse | FailureResponse)[] =
await Promise.all(processingPromises);
// Default to `true` if `processInParallel` is not specified.
const processInParallel = this.options?.processInParallel ?? true;
const processedRecords = processInParallel
? await this.#processRecordsInParallel()
: await this.#processRecordsSequentially();

this.clean();

return processedRecords;
}

/**
* Process a record with an asyncronous handler
* Process a record with an asynchronous handler
*
* An implementation of this method is required for asyncronous processors.
* An implementation of this method is required for asynchronous processors.
*
* When implementing this method, you should at least call the successHandler method
* when a record succeeds processing and the failureHandler method when a record
Expand Down Expand Up @@ -249,6 +249,30 @@ abstract class BasePartialProcessor {

return entry;
}

/**
* Processes records in parallel using `Promise.all`.
*/
async #processRecordsInParallel(): Promise<
(SuccessResponse | FailureResponse)[]
> {
return Promise.all(
this.records.map((record) => this.processRecord(record))
);
}

/**
* Processes records sequentially, ensuring that each record is processed one after the other.
*/
async #processRecordsSequentially(): Promise<
(SuccessResponse | FailureResponse)[]
> {
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
for (const record of this.records) {
processedRecords.push(await this.processRecord(record));
}
return processedRecords;
}
}

export { BasePartialProcessor };
7 changes: 7 additions & 0 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
* @property context The context object provided by the AWS Lambda runtime
* @property skipGroupOnError The option to group on error during processing
* @property throwOnFullBatchFailure The option to throw an error if the entire batch fails
* @property processInParallel Indicates whether the records should be processed in parallel
*/
type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
/**
Expand All @@ -30,6 +31,12 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
* Set this to false to prevent throwing an error if the entire batch fails.
*/
throwOnFullBatchFailure?: boolean;
/**
* Indicates whether the records should be processed in parallel.
* When set to `true`, the records will be processed in parallel using `Promise.all`.
* When set to `false`, the records will be processed sequentially.
*/
processInParallel?: T extends SqsFifoPartialProcessor ? never : boolean;
};

/**
Expand Down
Loading

0 comments on commit e31279a

Please sign in to comment.