Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Support Sequential Async Processing of Records for SqsFifoPartialProcessor #3140

Open
2 tasks done
amaral-ng opened this issue Sep 30, 2024 · 9 comments · May be fixed by #3160
Open
2 tasks done

Feature request: Support Sequential Async Processing of Records for SqsFifoPartialProcessor #3140

amaral-ng opened this issue Sep 30, 2024 · 9 comments · May be fixed by #3160
Assignees
Labels
batch This item relates to the Batch Processing Utility feature-request This item refers to a feature request for an existing or new utility help-wanted We would really appreciate some support from community for this one

Comments

@amaral-ng
Copy link

Use case

I am working with a FIFO SQS queue that requires processing batch records in an asynchronous manner. However, to maintain the order of messages, the SqsFifoPartialProcessor currently only supports sequential synchronous processing. This limitation prevents me from using asynchronous processing in my FIFO queue handler, which is essential for my use case.

Solution/User Experience

I propose enhancing the SqsFifoPartialProcessor to support sequential asynchronous processing while maintaining message ordering. This approach would be similar to the solution implemented here, but tailored to work with the SqsFifoPartialProcessor. This would allow users to leverage asynchronous processing within FIFO queues without sacrificing the ordering guarantees.

Alternative solutions

No response

Acknowledgment

Future readers

Please react with 👍 and your use case to help us understand customer demand.

@amaral-ng amaral-ng added feature-request This item refers to a feature request for an existing or new utility triage This item has not been triaged by a maintainer, please wait labels Sep 30, 2024
Copy link

boring-cyborg bot commented Sep 30, 2024

Thanks for opening your first issue here! We'll come back to you as soon as we can.
In the meantime, check out the #typescript channel on our Powertools for AWS Lambda Discord: Invite link

@dreamorosi dreamorosi added help-wanted We would really appreciate some support from community for this one batch This item relates to the Batch Processing Utility and removed triage This item has not been triaged by a maintainer, please wait labels Sep 30, 2024
@dreamorosi
Copy link
Contributor

Hi @amaral-ng, thank you for opening this feature request.

I think it makes total sense to add this, as long as we process the items one by one and await each promise before moving onto the next one.

I've also added the help wanted label, if anyone is interested in picking up the issue and contribute a PR, please leave a comment below so we can assign it to you.

@bml1g12
Copy link

bml1g12 commented Oct 4, 2024

Ah yes, I wanted to plus 1 this as I also spent a fair bit of time on this one. I eventually came to the same conclusion - that it's not currently supported, so I ended up dropping powertools for my use case and writing my own boilerplate here.

In my use case, I wanted to not only use await but also apply a global rate limit across each message within the batch, as I'm calling AWS' SDK APIs on each message which have their own rate limits associated.

@dreamorosi
Copy link
Contributor

Hi @bml1g12, thanks for the added context, this is very helpful.

May I ask how you'd be doing the rate limiting part? Do you maintain a separate persistence layer? How do you identify a request/operation? We're considering a rate limiting feature since we've had some other customers requesting it, and this info would be valuable.

@bml1g12
Copy link

bml1g12 commented Oct 4, 2024

This is the approach I'm using currently, i.e. rate limiting within the handler:

import pThrottle from "p-throttle"
...
const handler: SQSHandler = async (event: SQSEvent, context: Context) => {
    ...
    const throttle = pThrottle({
      limit: config.CallsPerSecondLimit,
      interval: 1000,
      strict: true,
    })
    const throttled = throttle(async (record: SqsRecord) => {
      log.debug("Processing a record from SQS", { local_context: { record } })
      await processRecord(record, sqsClient, "start")
    })
   for (const record of result.data.Records) {
       await throttled(record)

It would be even better if there was a convenience tool for global rate limiting across all lambdas - as it's a common problem we face when we have different lambda execution contextx running and hitting AWS imposed API rate limits

I appreciate maybe a better way would be to use DynamoDB to store the number of calls in last minute and use that instead, to provide persistence between lambda handlers - but also a lot more complex to implement and maintain

@arnabrahman
Copy link
Contributor

I am interested in this. Currently, SqsFifoPartialProcessor extends BatchProcessorSync. To support asynchronous operations, a potential solution is for SqsFifoPartialProcessor to extend BasePartialBatchProcessor, where we can implement the async methods process and processRecord alongside processRecordSync. For async processing, the records will always be processed sequentially. Additionally, the function signature of processPartialResponse would need to be updated to match processPartialResponseSync.

There may be other solutions worth exploring, but this is the one that comes to mind. Let me know your thoughts, @dreamorosi.

@dreamorosi
Copy link
Contributor

dreamorosi commented Oct 4, 2024

Hey @arnabrahman, ideally that would be the way to go, but unfortunately I think it would constitute a breaking change - even though I doubt many people use the SqsFifoPartialProcessor as-is today because of the sync nature.

I think adding this now will mean we have to do the opposite:

  • Create a SqsFifoPartialProcessorAsync that extends BasePartialBatchProcessor and implement the asynchronous logic there
  • Make the necessary changes to the processPartialResponse* method in that newly created class
  • Leave the current SqsFifoPartialProcessor as-is for now

We'll then add to the v3 backlog an action item to swap the two in the next major version. In v3, SqsFifoPartialProcessor will become the default and async, and SqsFifoPartialProcessorSync will be created.

Regarding the order of processing, yes, we'll need to always keep them sequential to avoid ordering issues.

What do you think?

@arnabrahman
Copy link
Contributor

arnabrahman commented Oct 5, 2024

Why not extend to BatchProcessor, the same way SqsFifoPartialProcessor extends BatchProcessorSync?

Since SqsFifoPartialProcessorAsync will have all the features of SqsFifoPartialProcessor, we could consider using Mixins to decouple some of the common logic between the two classes. I’m not entirely sure if this would be achievable, but I can give it a try. @dreamorosi

@dreamorosi
Copy link
Contributor

Hey @arnabrahman, I'm not familiar with mixins but I'm open to try. I'd say let's move forward and continue the discussion on the PR. I'm sure it'll be easier to talk once we have the code.

Thanks for the ideas!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
batch This item relates to the Batch Processing Utility feature-request This item refers to a feature request for an existing or new utility help-wanted We would really appreciate some support from community for this one
Projects
Status: Working on it
4 participants