sqs-consumer
sqs-consumer copied to clipboard
Delete only certain message received via handleMessageBatch
Is it possible to only delete some of the messages received in a batch? I understand that using handleMessage
in combination with a batchSize
greater than 1 results in them being processed one at a time but handleMessageBatch
seems to be all or nothing. For example, if I had an API that could save multiple messages at a time and return a list of those that were successfully saved, could I use that to delete only the successfully saved messages from the queue?
i have the same request
I also am wondering if this is possible. handleMessageBatch
is almost useless if it doesn't have the ability to handle partial success/failure.
Looking at the source code, this is not currently possible from what I can tell.
I propose allowing handleMessageBatch
to optionally return a list of successful messages, which would therefore be returned by executeBatchHandler
. If a modified list of messages is returned, then that would be used instead for the original full batch of messages by deleteMessageBatch
.
Here is the existing code...
private async processMessageBatch(messages: SQSMessage[]): Promise<void> {
messages.forEach((message) => {
this.emit('message_received', message);
});
try {
await this.executeBatchHandler(messages);
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
this.emit('message_processed', message);
});
} catch (err) {
this.emit('error', err, messages);
if (this.terminateVisibilityTimeout) {
try {
await this.terminateVisabilityTimeoutBatch(messages);
} catch (err) {
this.emit('error', err, messages);
}
}
}
}
And here are (most of) the proposed changes. Please let me know if you'd be open to a proper pull request, including tests.
private async processMessageBatch(messages: SQSMessage[]): Promise<void> {
messages.forEach((message) => {
this.emit('message_received', message);
});
try {
const successfulMessages = await this.executeBatchHandler(messages);
if (successfulMessages) {
await this.deleteMessageBatch(successfulMessages);
successfulMessages.forEach((message) => {
this.emit('message_processed', message);
});
const failedMessages = messages.filter(message => {
return successfulMessages.findIndex(successfulMessage => successfulMessage.MessageId === message.MessageId) === -1
})
messages = failedMessages
throw new Error('handleMessageBatch was partially successful.')
} else {
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
this.emit('message_processed', message);
});
}
} catch (err) {
this.emit('error', err, messages);
if (this.terminateVisibilityTimeout) {
try {
await this.terminateVisabilityTimeoutBatch(messages);
} catch (err) {
this.emit('error', err, messages);
}
}
}
}
i like it
Is it still ongoing?
I dealt with this issue by using my own workaround to throw custom error object in the messageHandlerBatch. The custom error has properties to hold succeeded/failed messages so that those messages could be handled properly on error
event listener.
I know that is not decent solution. Hope that this issue will get an attention from the author soon and then better solution will be introduced.
Just delete your own messages then within your batch handler:
handleMessageBatch: async (messages) => {
messages.forEach(message => {
// check success of message
const success = await processMessage(message);
if (success) {
instance.deleteMessage(message);
}
});
return Promise.resolve();
});
@crh3675 where does this instance comes from?
instance
is the main variable you assign from const instance = Consumer.create({
@crh3675 thanks for replying
Ok so I can manually delete a job but the problem is the library doesn't care anything and deletes all messages. Look at this in consumer.ts
try {
await this.executeBatchHandler(messages);
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
this.emit('message_processed', message);
});
}
So what's the use of deleting manually when the library wipes the whole batch
Unfortunately, this library is useless with combination of this bug and another bug where message handlers are run in paralel even for fifo queue.
Unfortunately, this library is useless with combination of this bug and another bug where message handlers are run in paralel even for fifo queue.
Oh good, I am not the only one seeing this problem. I am using the standard handleMessage
and every time my consumer polls, it grabs a new message even if the previous one has not finished processing. Seems like the contributors of this project are not very active in it anymore though.
Whoops! Wrong one sorry 😂
The resolution to https://github.com/bbc/sqs-consumer/issues/245 will provide a list of successful messages, once that's available, we can probably look at how messages are deleted.
Now that 245 has been merged, I think this is now possible, once that change has been released to NPM (probably after Christmas now).
The change will mean that you can return an array of messages that you want to acknowledge, only the messages in this array will be deleted.
If that does not resolve your issue then please feel free to open a new one :).