SQS Messages consuming freezes from time to time and results in messages getting lost.
Type: Bug
Component: "SQS"
Describe the bug I use two same consumers (potentially more) to distribute the requests among unique third-party license profiles. During the message processing, the consumer updates a Job status (pending, in progress, finished, etc) in a DynamoDB table by job's id so I can see what's happening in fact. The critical issue I face is that from time to time all the jobs are stuck in the Pending state which means they're not even received by the SQS consumers that set the state immediately to the in-progress state. For some periods all of the requests might be ignored, for others just a part. No errors in the logs.
The current message traffic is about 5 messages per minute. I run the consumer apps on On-Premises servers. Each server-consumer has to process one message at a time, no more.
<aws.java.sdk.version>2.20.43</aws.java.sdk.version>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>3.1.0</version>
@Bean
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient){
return SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient)
.configureDefaultConverter(converter -> {
converter.setPayloadTypeHeaderValueFunction((m) -> null);
})
.build();
}
Sample
The consumer's definition:
@SqsListener(value = "${requests-queue.endpoint}",
acknowledgementMode = "ON_SUCCESS",
maxConcurrentMessages = "1",
maxMessagesPerPoll = "1"
)
public void receive(@Valid JobRequest request) throws IOException, InterruptedException {
log.info("Received message: " + request);
service.process(request);
}
Hi @Sarkhad, a few questions:
-
I use two same consumers (potentially more)
Can you elaborate on what you mean by this? Is it two separate server instances? Or consumers in the same instance?
-
In the screenshot you shared there are no messages available in the queue. When this happens, can you check if the messages are left in the queue, or if they become in-flight?
-
Can you set the logs to TRACE in these classes, and share the corresponding logs to when the issue happens?
- SemaphoreBackPressureHandler
- AbstractPollingMessageSource
Mind that the logs will be very verbose at this level, so it's probably a good idea to run that in a lower environment.
- When this happens, does the app recover by itself (without restarting)?
Thanks.
1 I run two copies of the message-consuming code. Each copy runs on a separate server. One instance per server. 2 I always see 0 of available messages. Even manual pooling from the AWS console gives no messages.
3 I have set the logging level with: `SPRING_APPLICATION_JSON='{"logging.level.io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource": "TRACE","logging.level.io.awspring.cloud.sqs.listener.SemaphoreBackPressureHandler": "TRACE"}'
For the first try, I pushed to SQS 4 copies of the message, but only 2 of them were completed.
The consumer were repeating the following logs:
Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.121Z DEBUG 20307 --- [ecycle-thread-1] i.a.c.s.l.SemaphoreBackPressureHandler : SemaphoreBackPressureHandler created with configuration AUTO and 1 total permits Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.172Z DEBUG 20307 --- [ecycle-thread-2] i.a.c.s.l.s.AbstractPollingMessageSource : Starting StandardSqsMessageSource for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.203Z INFO 20307 --- [ecycle-thread-1] a.c.s.l.AbstractMessageListenerContainer : Container io.awspring.cloud.sqs.sqsListenerEndpointContainer#0 started Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.210Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.211Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 1 Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.216Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.216Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : 1 permits acquired for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.216Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.217Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : 1 permits acquired for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.288Z INFO 20307 --- [ main] r.o.b.BrowserClientApplication : Started BrowserClientApplication in 27.317 seconds (process running for 30.212) Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.644Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.646Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.646Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.647Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Not able to acquire 1 permits in 10000 milliseconds for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.649Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : No permits acquired for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.649Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.649Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.650Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW Feb 27 01:45:27 client.jar[20307]: 2024-02-27T01:45:27.767Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : 1 permits acquired for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 Feb 27 01:45:27 client.jar[20307]: 2024-02-27T01:45:27.772Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0
===
On the second try, I left only one consumer turned on and pushed again 4 copies of the message. 3 of 4 got completed. The messages let the consumer hit the cache so the message loss is not related to business logic.
==
On the third try with one consumer, only 1 of 4 is processed. The tracing keeps spamming with the same messages above. Still have no idea what's going on....
== On the fourth, last try. I have disabled all the consumers and have started one locally.
It was repeating something like this, the same as the others:
Not able to acquire 1 permits in 10000 milliseconds for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 2024-02-27T06:35:53.757+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : No permits acquired for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.757+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.757+04:00 DEBUG 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 2024-02-27T06:35:53.757+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW 2024-02-27T06:35:53.830+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : 1 permits acquired for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 2024-02-27T06:35:53.830+04:00 TRACE 26756 --- [c-response-9-29] i.a.c.s.l.SemaphoreBackPressureHandler : Released 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 1 2024-02-27T06:35:53.830+04:00 DEBUG 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 2024-02-27T06:35:53.830+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : 1 permits acquired for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.830+04:00 TRACE 26756 --- [c-response-9-29] i.a.c.s.l.s.AbstractPollingMessageSource : Released batch of unused permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.831+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.831+04:00 DEBUG 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 2024-02-27T06:35:53.831+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW
Again pushed 4 copies in a row and only got 1 completed. Other messages just disappeared. No mentions, no exceptions, no appearance in manual pooling, 0 in message available.
"When this happens, does the app recover by itself (without restarting)?" The regular loss of messages has been happening since I developed the part. Just didn't count it seriously at the beginning.
The pseudo code for the service.process(request); `public synchronized void download(Request request) { log.info("blabla1"); log.info("blabla2"); log.info("blabla3");
var job = jobsRepo.get(request.id()); if(job.isEmpty() { log.error("bla-bla") return }
jobsRepo.updateStatus(request.id, State.IN_PROGRESS);
if(hasCached(request.payload)) { jobsRepo.updateStatus(request.id, State.COMPLETED); return; }
//continuing as first time. Never called in the context of the discussion. } `
I used synchronized to make sure the critical resource was not used in parallel otherwise it failed (running Chrome instance through Selenium). And, to be precise, I don't see any "blabla1, blabla2, blabla3" in the logs which means service.process() is not even starting.
P. S. Extra try confirmed that the listener works chaotically. For a while, with no restart, the locally running consumer started to process the messages but after some time started ignoring them again... P. P. S Tried even without that synchronized keyword, no changes.
The issue was sort of resolved when I created and switched to one more SQS queue with the same settings. It needs more time for observation.
Hi @Sarkhad.
The logs seem normal - it's acquiring and releasing permits as expected.
Would you mind removing the synchronized block and removing the critical resource from the logic to see if you still have the issue?
Please also set AbstractMessageProcessingPipelineSink logs to TRACE.
Thanks.
@Sarkhad, I've created an integration test to assert consumption for one message at a time is working as expected.
From the code and logs you provided, it seems very likely you're having an issue due to synchronization.
I'll close this issue since it's been 2 weeks without feedback.
If you can create a sample project the reproduces the issue or provide logs indicating the framework is misbehaving we can reopen it.
Thanks.