spring-cloud-aws icon indicating copy to clipboard operation
spring-cloud-aws copied to clipboard

Acknowledgement failed when the application graceful shutdown

Open XUmeng96 opened this issue 1 year ago • 6 comments

Type: Bug

Component: SQS

Describe the bug Version:

JDK:17
spring-cloud-aws-dependencies : 3.0.2
spring-cloud-dependencies:2022.0.4
amazon-sqs-java-messaging-lib:2.1.1
spring-boot-dependencies : 3.1.4
amazon.awssdk:2.21.0

Error: I use a @SqsListener and a MessageListenerContainerFactory bean to create the MessageListenerContainer. batching acknoeledge messages. when I shut down the application, I got some error message, or maybe no error but the messages still inflight until the visibility timeout instead of acknoledged. I hope the acknoledgement can be finished even if the shutdownhook triggered. I saw doStop() method in BatchingAcknowledgementProcessor . It only waiting for runningAcks until the acknowledgementShutdownTimeout. If there are some messages in BlockingQueue acks and not in Map<String, BlockingQueue<Message<T>>> acksBuffer, these messages will not be acknowlwdged, right? I'm not sure why we only waiting for runningAcks to finish and Is there a way to ensure that all messages being processed are acknowledged?

Sample MessageListenerContainerFactory config as following:

@Bean("sqsFactory")
SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {

       return SqsMessageListenerContainerFactory.builder().configure(
                       options -> options.acknowledgementMode(AcknowledgementMode.MANUAL)
                               .listenerMode(ListenerMode.SINGLE_MESSAGE)
                               .maxConcurrentMessages(100)
                               .maxMessagesPerPoll(100)
                               .listenerShutdownTimeout(Duration.of(25L, ChronoUnit.SECONDS))
                               .acknowledgementShutdownTimeout(Duration.of(20L, ChronoUnit.SECONDS))
                               .acknowledgementThreshold(5)
                               .acknowledgementInterval(Duration.of(50, ChronoUnit.MILLIS))
                               .queueNotFoundStrategy(QueueNotFoundStrategy.FAIL)
               ).sqsAsyncClient(sqsAsyncClient)
               .build();
   }

and I simulate the business logic by sleeping 15 seconds.

 @SqsListener(value = "webhook-performance-test-sqs", factory = "sqsFactory")
    public void queueListener(@Payload String sqsBatchMessage, Acknowledgement acknowledgement) {
  
            try {
            Thread.sleep(15000L);
             log.info("business process finished....");
        } catch (Exception ex) {
            log.error("Failed.", ex);
        } finally {
            acknowledgment.acknowledgeAsync();
            log.info("acknowledgment finish  ....");

}

XUmeng96 avatar Oct 27 '23 14:10 XUmeng96

Hey @XUmeng96, sorry for the delay.

You're right in that there's a racing condition there where the ack processor may stop before all acks are added to the queue. Maybe it's worth it adding logic to also wait for the queue to be empty.

Nevertheless, considering we have a dedicated thread that pretty much just waits on this queue and adds messages to the buffer, I'd think more often than not we wouldn't have this issue.

What you are seeing seems much more like an effect of this logic where we ignore new messages for acknowledgement after we receive the order to stop:

https://github.com/awspring/spring-cloud-aws/blob/2c39154a2f6ddbd82718d74f110fac777d7fdca2/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/AbstractOrderingAcknowledgementProcessor.java#L157-L164

Also, keep in mind that the message source only signals the acknowledgement processor to stop after all messages have completed processor - or shutdownTimeout if it comes first.

https://github.com/awspring/spring-cloud-aws/blob/2c39154a2f6ddbd82718d74f110fac777d7fdca2/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java#L295-L311

You can set your log level to TRACE on these classes to have a clear picture of what exactly happened to a specific message.

Overall, batching acks offers weak guarantees on that all messages will necessarily be acknowledged - acknowledgement might fail for example.

If you need strong guarantees, you should probably use the ImmediateAcknowledgement processor to acknowledge messages right after they're processed, and also add an AcknowledgementResultCallback to act on failures.

Does this make sense to you? Thanks.

tomazfernandes avatar Jan 04 '24 02:01 tomazfernandes

I'm also experiencing this issue now where a message is added via BatchingAcknowledgementProcessor#doOnAcknowledge, but the application is already in the process of shutting down, so the message is never moved from acks to acksBuffer. This however means that even though we still wait 10 seconds for the acknowledgements to finish the entries in acks will never be acknowledged anymore.

In my opinion it would make sense to try to acknowledge all events hat have been received as I don't see why we can only have a weak guarantee here. Also, if the defaults allow a situation where a message is processed successfully on the application side, but is not necessarily acknowledged in SQS then either the documentation should be updated to reflect that important information (imho) or the default should ensure a strong guarantee

MarcelLorke6666 avatar Jan 26 '24 08:01 MarcelLorke6666

Hey @MarcelLorke6666, thanks for your report.

I should have time to look into this this week.

as I don't see why we can only have a weak guarantee here.

Standard SQS queues provide an at-least-once guarantee, which the framework respects. So while fixing this racing condition might minimize duplicate processing at application shutdown, it would not change the guarantee itself.

or the default should ensure a strong guarantee

FIFO queues have strong guarantees - it guarantees that a message will be served exactly once and in order within a timeframe if both Producer and Consumer are correctly configured. The framework also respects that by defaulting to IMMEDIATE (Sync) acknowledgement for FIFO queues and gives the user tools to handle situations where a failure happens in acknowledgement to avoid duplicate / out-of-order processing.

Defaulting Standard SQS queues to IMMEDIATE acknowledgement though would incur in a significant performance and cost hit, while doing really nothing for the user in terms of guarantees - though users can configure that if they so wish.

tomazfernandes avatar Jan 28 '24 01:01 tomazfernandes

Thank you for taking the time!

You are right and generally we also have measures in place. In our specific scenario the deduplication logic already evicted this event and we were surprised to see it again a few hours later as from the perspective of the application everything went just fine.

mlork avatar Jan 29 '24 08:01 mlork

Thanks for the info in previous comments.

We are also experiencing this issue. Messages are being received multiple times (max receive == 2) and completing successfully but never being acknowledged. This results in the maxReceiveCount being hit and messages go to the dlq for our service which triggers false alerts for service issues. This is actually quite likely to happen in k8s environments where the scheduler frequently stops pods and your message is unlucky enough to be received each time by a pod that is about to be shutdown.

Standard SQS queues provide an at-least-once guarantee, which the framework respects. So while fixing this racing condition might minimize duplicate processing at application shutdown, it would not change the guarantee itself.

You could argue that since this issue breaks message acknowledgements then the at-least-once guarantee has been broken

We are going to try the immediate acknowledge as a workaround for now.

mgrundie-r7 avatar Feb 06 '24 17:02 mgrundie-r7

Thanks @mgrundie-r7, let us know how that works for you.

I'm looking into another ack issue and should be able to look into this this one this weekend or so.

It's a good thing that with increased usage issues are surfacing so we can fix them.

We'll get there :)

tomazfernandes avatar Feb 06 '24 17:02 tomazfernandes

Hey everyone, I haven't had the time to work on this yet.

The fix itself doesn't look too complex, but it should be well tested so we don't have a regression later.

If anyone would like to contribute a PR for this I'd be happy to review and merge it.

Thanks.

tomazfernandes avatar Mar 02 '24 18:03 tomazfernandes

Hey everyone, finally had the time for this fix. There are no other acknowledgement problems on the radar, so hopefully all is well now 🙌🏼

Here's the PR with the fix

Reviews are appreciated, and feel free to checkout the branch and test it if you'd like.

Thanks!

tomazfernandes avatar Mar 10 '24 19:03 tomazfernandes

Fixed in https://github.com/awspring/spring-cloud-aws/pull/1082.

tomazfernandes avatar Mar 12 '24 02:03 tomazfernandes