sqs-consumer icon indicating copy to clipboard operation
sqs-consumer copied to clipboard

Delete only certain message received via handleMessageBatch

Open TylerMatteo opened this issue 5 years ago • 10 comments

Is it possible to only delete some of the messages received in a batch? I understand that using handleMessage in combination with a batchSize greater than 1 results in them being processed one at a time but handleMessageBatch seems to be all or nothing. For example, if I had an API that could save multiple messages at a time and return a list of those that were successfully saved, could I use that to delete only the successfully saved messages from the queue?

TylerMatteo avatar Oct 23 '19 18:10 TylerMatteo

i have the same request

ionutale avatar Nov 12 '19 10:11 ionutale

I also am wondering if this is possible. handleMessageBatch is almost useless if it doesn't have the ability to handle partial success/failure.

Looking at the source code, this is not currently possible from what I can tell.

I propose allowing handleMessageBatch to optionally return a list of successful messages, which would therefore be returned by executeBatchHandler. If a modified list of messages is returned, then that would be used instead for the original full batch of messages by deleteMessageBatch.

Here is the existing code...

private async processMessageBatch(messages: SQSMessage[]): Promise<void> {
  messages.forEach((message) => {
    this.emit('message_received', message);
  });

  try {
    await this.executeBatchHandler(messages);
    await this.deleteMessageBatch(messages);
    messages.forEach((message) => {
      this.emit('message_processed', message);
    });
  } catch (err) {
    this.emit('error', err, messages);

    if (this.terminateVisibilityTimeout) {
      try {
        await this.terminateVisabilityTimeoutBatch(messages);
      } catch (err) {
        this.emit('error', err, messages);
      }
    }
  }
}

And here are (most of) the proposed changes. Please let me know if you'd be open to a proper pull request, including tests.

private async processMessageBatch(messages: SQSMessage[]): Promise<void> {
  messages.forEach((message) => {
    this.emit('message_received', message);
  });

  try {
    const successfulMessages = await this.executeBatchHandler(messages);
    if (successfulMessages) {
      await this.deleteMessageBatch(successfulMessages);
      successfulMessages.forEach((message) => {
        this.emit('message_processed', message);
      });
      const failedMessages = messages.filter(message => {
        return successfulMessages.findIndex(successfulMessage => successfulMessage.MessageId === message.MessageId) === -1
      })
      messages = failedMessages
      throw new Error('handleMessageBatch was partially successful.')
    } else {
      await this.deleteMessageBatch(messages);
      messages.forEach((message) => {
        this.emit('message_processed', message);
      });
    }
  } catch (err) {
    this.emit('error', err, messages);

    if (this.terminateVisibilityTimeout) {
      try {
        await this.terminateVisabilityTimeoutBatch(messages);
      } catch (err) {
        this.emit('error', err, messages);
      }
    }
  }
}

johncmunson avatar Dec 20 '19 18:12 johncmunson

i like it

ionutale avatar Jan 17 '20 06:01 ionutale

Is it still ongoing?

I dealt with this issue by using my own workaround to throw custom error object in the messageHandlerBatch. The custom error has properties to hold succeeded/failed messages so that those messages could be handled properly on error event listener.

I know that is not decent solution. Hope that this issue will get an attention from the author soon and then better solution will be introduced.

fullc0de avatar Mar 31 '20 03:03 fullc0de

Just delete your own messages then within your batch handler:

handleMessageBatch: async (messages) => {
  messages.forEach(message => {
   // check success of message
   const success = await processMessage(message);
    if (success) {
      instance.deleteMessage(message);
    }
  });
  return Promise.resolve();
});

crh3675 avatar May 22 '20 21:05 crh3675

@crh3675 where does this instance comes from?

jackbraj avatar Jun 04 '20 07:06 jackbraj

instance is the main variable you assign from const instance = Consumer.create({

crh3675 avatar Jun 04 '20 08:06 crh3675

@crh3675 thanks for replying

Ok so I can manually delete a job but the problem is the library doesn't care anything and deletes all messages. Look at this in consumer.ts

try {
      await this.executeBatchHandler(messages);
      await this.deleteMessageBatch(messages);
      messages.forEach((message) => {
        this.emit('message_processed', message);
      });
    }

So what's the use of deleting manually when the library wipes the whole batch

jackbraj avatar Jun 04 '20 10:06 jackbraj

Unfortunately, this library is useless with combination of this bug and another bug where message handlers are run in paralel even for fifo queue.

trubi avatar Feb 11 '21 11:02 trubi

Unfortunately, this library is useless with combination of this bug and another bug where message handlers are run in paralel even for fifo queue.

Oh good, I am not the only one seeing this problem. I am using the standard handleMessage and every time my consumer polls, it grabs a new message even if the previous one has not finished processing. Seems like the contributors of this project are not very active in it anymore though.

adamleemiller avatar Apr 10 '21 08:04 adamleemiller

Whoops! Wrong one sorry 😂

nicholasgriffintn avatar Dec 09 '22 12:12 nicholasgriffintn

The resolution to https://github.com/bbc/sqs-consumer/issues/245 will provide a list of successful messages, once that's available, we can probably look at how messages are deleted.

nicholasgriffintn avatar Dec 16 '22 17:12 nicholasgriffintn

Now that 245 has been merged, I think this is now possible, once that change has been released to NPM (probably after Christmas now).

The change will mean that you can return an array of messages that you want to acknowledge, only the messages in this array will be deleted.

If that does not resolve your issue then please feel free to open a new one :).

nicholasgriffintn avatar Dec 16 '22 22:12 nicholasgriffintn