sqs-consumer
sqs-consumer copied to clipboard
Call handleMessage in sequence, instead of using Promise.all()
Description
I removed await Promise.all()
in favor of for (...) { await }
, because Promise.all could result in messages not being handled in order.
Motivation and Context
Message ordering is important. The same can be said about the order in which messages are processed. In many cases, if not most, messages need to be processed one after the other, in the order they arrive.
Promise.all() runs handlers for all messages "in parallel". Well, in fact, it's in sequence until the first await
in the handler's code. However, since handleMessage is declared to be an async function, I presume it will usually have an await
. In practice, that means that messages are not processed in sequence, waiting for the previous message to be finished, but all at once.
My code fixes the issue by eliminating the parallelism and instead introducing a for loop in which we await the completion of each message handler before continuing.
This keeps the order in which handleMessage is called, but it changes the order of when it finishes. It is therefore potentially a breaking change.
Types of changes
- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Breaking change (fix or feature that would cause existing functionality to change)
Checklist:
- [x] My code follows the code style of this project.
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the CONTRIBUTING document.
- [ ] I have added tests to cover my changes.
- [x] All new and existing tests passed.
The library by default only polls and processes one message at the time, making this a non-issue.
The user actively has to choose a higher batchSize
for this to become an "issue". Even in that case, the library allows the user to pass in a custom function to handle the batch with the handleMessageBatch
, in which the user can process the messages in whatever order they want.
Not a bug.
In our codebase, we initialize the Consumer with a batchSize
of 10. I can't imagine we're alone in this, since if you use a FIFO queue, which has a limit of 300 API calls per second, and poll for just one message at a time, you cut you queue's throughput by a factor of 10.
Initially, we tried using handleMessage
to handle the messages in sequence. We quickly switched to handleMessageBatch
, as you suggest, because handleMessage
didn't do what we assumed it would.
But now we have another problem. If one of the messages in our queue throws an error, all messages from the batch, even the already successfully processed ones, are invalidated. This issue is mentioned in #189 and #245, along with a workaround.
I see multiple ways of resolving issues #189 and #245:
-
Document the workaround: Deleting messages manually from the queue in
handleMessageBatch
becomes the official way to indicate message handling success - Design and implement a way to tell which messages have been handled successfully: like in johncmunson's proposed patch
-
Consider merging this pull request: As you say, by default the library only polls and processes one message at the time. That's the default i.e. the expected behavior. Let's say I use
handleMessage
to process one message at a time. When I raise thebatchSize
to 10 to increase the throughput from 300 to 3000, messages are suddenly processed out of order. Since I expecthandleMessage
to run in order, like it did before, I'd say that is an issue.
If there is an issue only when you change some parameter it does not mean it's not a bug. It's a bug which emerges in some specific conditions but it's still a bug. And pretty critical one. I've been struggling with this as well and it caused a lot of troubles. If you don't want to merge this then you should at least clearly state in your documentation that this batchSize parameter does not work as expected. If I am using FIFO queue but then this library shuffles all events during processing, I can't imagine a single use case where this wouldn't be considered a critical issue.
Can you make a status if it will merged or not? at least users will get a clear view if they have to be fork it thanks
Reading this comment: https://github.com/bbc/sqs-consumer/pull/248#issuecomment-769693807
I think this was sufficiently answered and given the age, I'm unsure if the requirement still exists.
Should you still think this is still an issue / requires better documentation, please open a new PR.
This pull request has already been merged/closed. If you experience issues related to these changes, please open a new issue referencing this pull request.