powertools-lambda-typescript
powertools-lambda-typescript copied to clipboard
Feature request: Support Sequential Async Processing of Records for SqsFifoPartialProcessor
Use case
I am working with a FIFO SQS queue that requires processing batch records in an asynchronous manner. However, to maintain the order of messages, the SqsFifoPartialProcessor currently only supports sequential synchronous processing. This limitation prevents me from using asynchronous processing in my FIFO queue handler, which is essential for my use case.
Solution/User Experience
I propose enhancing the SqsFifoPartialProcessor to support sequential asynchronous processing while maintaining message ordering. This approach would be similar to the solution implemented here, but tailored to work with the SqsFifoPartialProcessor. This would allow users to leverage asynchronous processing within FIFO queues without sacrificing the ordering guarantees.
Alternative solutions
No response
Acknowledgment
- [X] This feature request meets Powertools for AWS Lambda (TypeScript) Tenets
- [X] Should this be considered in other Powertools for AWS Lambda languages? i.e. Python, Java, and .NET
Future readers
Please react with 👍 and your use case to help us understand customer demand.
Thanks for opening your first issue here! We'll come back to you as soon as we can. In the meantime, check out the #typescript channel on our Powertools for AWS Lambda Discord: Invite link
Hi @amaral-ng, thank you for opening this feature request.
I think it makes total sense to add this, as long as we process the items one by one and await each promise before moving onto the next one.
I've also added the help wanted label, if anyone is interested in picking up the issue and contribute a PR, please leave a comment below so we can assign it to you.
Ah yes, I wanted to plus 1 this as I also spent a fair bit of time on this one. I eventually came to the same conclusion - that it's not currently supported, so I ended up dropping powertools for my use case and writing my own boilerplate here.
In my use case, I wanted to not only use await but also apply a global rate limit across each message within the batch, as I'm calling AWS' SDK APIs on each message which have their own rate limits associated.
Hi @bml1g12, thanks for the added context, this is very helpful.
May I ask how you'd be doing the rate limiting part? Do you maintain a separate persistence layer? How do you identify a request/operation? We're considering a rate limiting feature since we've had some other customers requesting it, and this info would be valuable.
This is the approach I'm using currently, i.e. rate limiting within the handler:
import pThrottle from "p-throttle"
...
const handler: SQSHandler = async (event: SQSEvent, context: Context) => {
...
const throttle = pThrottle({
limit: config.CallsPerSecondLimit,
interval: 1000,
strict: true,
})
const throttled = throttle(async (record: SqsRecord) => {
log.debug("Processing a record from SQS", { local_context: { record } })
await processRecord(record, sqsClient, "start")
})
for (const record of result.data.Records) {
await throttled(record)
It would be even better if there was a convenience tool for global rate limiting across all lambdas - as it's a common problem we face when we have different lambda execution contextx running and hitting AWS imposed API rate limits
I appreciate maybe a better way would be to use DynamoDB to store the number of calls in last minute and use that instead, to provide persistence between lambda handlers - but also a lot more complex to implement and maintain
I am interested in this. Currently, SqsFifoPartialProcessor extends BatchProcessorSync. To support asynchronous operations, a potential solution is for SqsFifoPartialProcessor to extend BasePartialBatchProcessor, where we can implement the async methods process and processRecord alongside processRecordSync. For async processing, the records will always be processed sequentially. Additionally, the function signature of processPartialResponse would need to be updated to match processPartialResponseSync.
There may be other solutions worth exploring, but this is the one that comes to mind. Let me know your thoughts, @dreamorosi.
Hey @arnabrahman, ideally that would be the way to go, but unfortunately I think it would constitute a breaking change - even though I doubt many people use the SqsFifoPartialProcessor as-is today because of the sync nature.
I think adding this now will mean we have to do the opposite:
- Create a
SqsFifoPartialProcessorAsyncthat extendsBasePartialBatchProcessorand implement the asynchronous logic there - Make the necessary changes to the
processPartialResponse*method in that newly created class - Leave the current
SqsFifoPartialProcessoras-is for now
We'll then add to the v3 backlog an action item to swap the two in the next major version. In v3, SqsFifoPartialProcessor will become the default and async, and SqsFifoPartialProcessorSync will be created.
Regarding the order of processing, yes, we'll need to always keep them sequential to avoid ordering issues.
What do you think?
Why not extend to BatchProcessor, the same way SqsFifoPartialProcessor extends BatchProcessorSync?
Since SqsFifoPartialProcessorAsync will have all the features of SqsFifoPartialProcessor, we could consider using Mixins to decouple some of the common logic between the two classes. I’m not entirely sure if this would be achievable, but I can give it a try. @dreamorosi
Hey @arnabrahman, I'm not familiar with mixins but I'm open to try. I'd say let's move forward and continue the discussion on the PR. I'm sure it'll be easier to talk once we have the code.
Thanks for the ideas!
⚠️ COMMENT VISIBILITY WARNING ⚠️
This issue is now closed. Please be mindful that future comments are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.
This is now released under v2.11.0 version!