spring-cloud-aws
spring-cloud-aws copied to clipboard
Acknowledgement failed when the application graceful shutdown
Type: Bug
Component: SQS
Describe the bug Version:
JDK:17
spring-cloud-aws-dependencies : 3.0.2
spring-cloud-dependencies:2022.0.4
amazon-sqs-java-messaging-lib:2.1.1
spring-boot-dependencies : 3.1.4
amazon.awssdk:2.21.0
Error: I use a @SqsListener and a MessageListenerContainerFactory bean to create the MessageListenerContainer. batching acknoeledge messages. when I shut down the application, I got some error message, or maybe no error but the messages still inflight until the visibility timeout instead of acknoledged. I hope the acknoledgement can be finished even if the shutdownhook triggered. I saw doStop() method in BatchingAcknowledgementProcessor . It only waiting for runningAcks until the acknowledgementShutdownTimeout. If there are some messages in BlockingQueue acks and not in Map<String, BlockingQueue<Message<T>>> acksBuffer, these messages will not be acknowlwdged, right? I'm not sure why we only waiting for runningAcks to finish and Is there a way to ensure that all messages being processed are acknowledged?
Sample MessageListenerContainerFactory config as following:
@Bean("sqsFactory")
SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
return SqsMessageListenerContainerFactory.builder().configure(
options -> options.acknowledgementMode(AcknowledgementMode.MANUAL)
.listenerMode(ListenerMode.SINGLE_MESSAGE)
.maxConcurrentMessages(100)
.maxMessagesPerPoll(100)
.listenerShutdownTimeout(Duration.of(25L, ChronoUnit.SECONDS))
.acknowledgementShutdownTimeout(Duration.of(20L, ChronoUnit.SECONDS))
.acknowledgementThreshold(5)
.acknowledgementInterval(Duration.of(50, ChronoUnit.MILLIS))
.queueNotFoundStrategy(QueueNotFoundStrategy.FAIL)
).sqsAsyncClient(sqsAsyncClient)
.build();
}
and I simulate the business logic by sleeping 15 seconds.
@SqsListener(value = "webhook-performance-test-sqs", factory = "sqsFactory")
public void queueListener(@Payload String sqsBatchMessage, Acknowledgement acknowledgement) {
try {
Thread.sleep(15000L);
log.info("business process finished....");
} catch (Exception ex) {
log.error("Failed.", ex);
} finally {
acknowledgment.acknowledgeAsync();
log.info("acknowledgment finish ....");
}
Hey @XUmeng96, sorry for the delay.
You're right in that there's a racing condition there where the ack processor may stop before all acks are added to the queue. Maybe it's worth it adding logic to also wait for the queue to be empty.
Nevertheless, considering we have a dedicated thread that pretty much just waits on this queue and adds messages to the buffer, I'd think more often than not we wouldn't have this issue.
What you are seeing seems much more like an effect of this logic where we ignore new messages for acknowledgement after we receive the order to stop:
https://github.com/awspring/spring-cloud-aws/blob/2c39154a2f6ddbd82718d74f110fac777d7fdca2/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/AbstractOrderingAcknowledgementProcessor.java#L157-L164
Also, keep in mind that the message source only signals the acknowledgement processor to stop after all messages have completed processor - or shutdownTimeout
if it comes first.
https://github.com/awspring/spring-cloud-aws/blob/2c39154a2f6ddbd82718d74f110fac777d7fdca2/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java#L295-L311
You can set your log level to TRACE
on these classes to have a clear picture of what exactly happened to a specific message.
Overall, batching acks offers weak guarantees on that all messages will necessarily be acknowledged - acknowledgement might fail for example.
If you need strong guarantees, you should probably use the ImmediateAcknowledgement
processor to acknowledge messages right after they're processed, and also add an AcknowledgementResultCallback
to act on failures.
Does this make sense to you? Thanks.
I'm also experiencing this issue now where a message is added via BatchingAcknowledgementProcessor#doOnAcknowledge
, but the application is already in the process of shutting down, so the message is never moved from acks
to acksBuffer
. This however means that even though we still wait 10 seconds for the acknowledgements to finish the entries in acks
will never be acknowledged anymore.
In my opinion it would make sense to try to acknowledge all events hat have been received as I don't see why we can only have a weak guarantee here. Also, if the defaults allow a situation where a message is processed successfully on the application side, but is not necessarily acknowledged in SQS then either the documentation should be updated to reflect that important information (imho) or the default should ensure a strong guarantee
Hey @MarcelLorke6666, thanks for your report.
I should have time to look into this this week.
as I don't see why we can only have a weak guarantee here.
Standard SQS queues provide an at-least-once
guarantee, which the framework respects. So while fixing this racing condition might minimize duplicate processing at application shutdown, it would not change the guarantee itself.
or the default should ensure a strong guarantee
FIFO queues have strong guarantees - it guarantees that a message will be served exactly once and in order within a timeframe if both Producer and Consumer are correctly configured. The framework also respects that by defaulting to IMMEDIATE
(Sync) acknowledgement for FIFO queues and gives the user tools to handle situations where a failure happens in acknowledgement to avoid duplicate / out-of-order processing.
Defaulting Standard SQS queues to IMMEDIATE acknowledgement though would incur in a significant performance and cost hit, while doing really nothing for the user in terms of guarantees - though users can configure that if they so wish.
Thank you for taking the time!
You are right and generally we also have measures in place. In our specific scenario the deduplication logic already evicted this event and we were surprised to see it again a few hours later as from the perspective of the application everything went just fine.
Thanks for the info in previous comments.
We are also experiencing this issue. Messages are being received multiple times (max receive == 2) and completing successfully but never being acknowledged. This results in the maxReceiveCount being hit and messages go to the dlq for our service which triggers false alerts for service issues. This is actually quite likely to happen in k8s environments where the scheduler frequently stops pods and your message is unlucky enough to be received each time by a pod that is about to be shutdown.
Standard SQS queues provide an at-least-once guarantee, which the framework respects. So while fixing this racing condition might minimize duplicate processing at application shutdown, it would not change the guarantee itself.
You could argue that since this issue breaks message acknowledgements then the at-least-once guarantee has been broken
We are going to try the immediate acknowledge as a workaround for now.
Thanks @mgrundie-r7, let us know how that works for you.
I'm looking into another ack issue and should be able to look into this this one this weekend or so.
It's a good thing that with increased usage issues are surfacing so we can fix them.
We'll get there :)
Hey everyone, I haven't had the time to work on this yet.
The fix itself doesn't look too complex, but it should be well tested so we don't have a regression later.
If anyone would like to contribute a PR for this I'd be happy to review and merge it.
Thanks.
Hey everyone, finally had the time for this fix. There are no other acknowledgement problems on the radar, so hopefully all is well now 🙌🏼
Here's the PR with the fix
Reviews are appreciated, and feel free to checkout the branch and test it if you'd like.
Thanks!
Fixed in https://github.com/awspring/spring-cloud-aws/pull/1082.