spring-cloud-aws
spring-cloud-aws copied to clipboard
Add BatchVisibility to SQS
Type: Bug I am using spring-boot 3.0.6, spring-cloud-aws: 3.0.1. SQS is consuming messages from SNS and we process the message. If the processing is successful we acknowledge the message other we want to retry the message after 5mins. I believe we can able to achieve this functionality using "Visibility" so that the message will be picked after 5mins. But, "Visibility" doesn't have such an option.
AWS SDK has the option to do that [here]
Component: "SNS", "SQS"
Please find the below code for your reference,
@SqsListener(value = "dev-sample-job-v1", pollTimeoutSeconds = "20", messageVisibilitySeconds="60", maxMessagesPerPoll = "10")
public void processEvent(List<Message<String>> messages,
BatchAcknowledgement<String> batchAcknowledgement
) throws Exception {
List<Message<String>> ack_message = new ArrayList<>();
for (Message<String> message:messages) {
Event event = //Parse and get the payload
if (eventService.process(event)) {
ack_message.add(message);
} else {
// How to set visibility timeout for the unacknowledged message to 5mins here
}
batchAcknowledgement.acknowledgeAsync(ack_message);
}
Any help would be really appreciable.
@maciejwalkowiak @tomazfernandes If this seems sound valid to you means let me know if I can contribute to this issue.
Hi @VelDeveloper, thanks for bringing this up.
For the use case you're describing, ideally you would set visibility timeout to 5 minutes in the queue itself.
This way, if the message is processed successfully, it's deleted from the queue immediately.
If message processing is not successful, the message won't be acknowledged and SQS will make the message visible again after 5 minutes.
This way no further configuration or intervention is necessary in the code.
Also, I'm not sure what you're trying to accomplish by fetching a batch of messages and processing one at a time. With this, while you process message 1, messages 2-10 are spending their visibility timeout which can be an issue.
It might make more sense to process with a regular single-message listener, this way you don't really need to worry about manually acknowledging and your code could be as simples as:
@SqsListener(value = "dev-sample-job-v1", pollTimeoutSeconds = "20", messageVisibilitySeconds="60", maxMessagesPerPoll = "10")
public void processEvent(Message<String> message) throws Exception {
Event event = //Parse and get the payload
eventService.process(event)
}
}
That's considering eventService.process(event)
would throw an exception in case of failure.
This way you can process many messages in parallel and not have the problem with each message's visibility.
Does this make sense?
Thanks.
@tomazfernandes Thanks a lot for your response. The reason why I want to keep visibility timeout inside the code is, in-future If I want to update the visibility timeout then I don't need to touch my terraform or CloudFormation script. I can update the spring configuration and redeploy the instance will be good to reflect the new visibility timeout.
Also, If we update the visibility timeout in cloudFormation or Terraform script, it may be required to destroy & recreate the queues. I am not 100% sure about it.
Regarding the batches, I thought long-polling is good so we can consume the messages in batches, and process them.
If there is an error while processing the batch items then update the batch visibility timeout for all those (error)messages otherwise, send batchAcknowledgeMent. However, we don't have a BatchVisibility option in spring-cloud-aws at the moment.
Also, I can understand your point, since we are processing the messages one-by-one the consumer can consume a single message instead of batches. It's good to remove the "maxMessagesPerPoll=10" from the annotation so that it consumes a single message.
So the final code should be,
@SqsListener(value = "dev-sample-job-v1", pollTimeoutSeconds = "20", messageVisibilitySeconds="60")
public void processEvent(Message<String> message, Visibility visibility) throws Exception {
Event event = //Parse and get the payload
try {
}catch(Exception e) {
visibility.changeToAsync(300); //retry the message in 300 seconds
}
eventService.process(event)
}
}
Does this make sense? Can we consider the option of updating the visibility timeout with spring configuration?
@tomazfernandes Thanks a lot for your response. The reason why I want to keep visibility timeout inside the code is, in-future If I want to update the visibility timeout then I don't need to touch my terraform or CloudFormation script. I can update the spring configuration and redeploy the instance will be good to reflect the new visibility timeout.
Sure, the framework should enable you to do that.
Also, If we update the visibility timeout in cloudFormation or Terraform script, it may be required to destroy & recreate the queues. I am not 100% sure about it.
You need to check Terraform / cloudFormation documentation on this, but I'm pretty confident you can change visibility settings without deleting the queue. Also, I personally prefer to leave this configuration in one place, such as the queue itself, rather than having a configuration set in the queue that will always be overridden. But that's my opinion only.
Regarding the batches, I thought long-polling is good so we can consume the messages in batches, and process them.
Batch listeners have no influence in how the polling is done. The framework always polls for a batch, the difference is with a single message listener you'll usually have many messages processed in parallel, and with a batch listener you'll receive a list. So for a Standard SQS queue it's usually a lot more performant to have a single message listener than having a batch listener and iterating on it.
All that being said, I think we could indeed have a BatchVisibility
object in the same way we have the BatchAcknowledgement
. If you want to give it a shot please go ahead.
Thanks!