spring-cloud-aws icon indicating copy to clipboard operation
spring-cloud-aws copied to clipboard

SQS Messages consuming freezes from time to time and results in messages getting lost.

Open Sarkhad opened this issue 1 year ago • 3 comments

Type: Bug

Component: "SQS"

Describe the bug I use two same consumers (potentially more) to distribute the requests among unique third-party license profiles. During the message processing, the consumer updates a Job status (pending, in progress, finished, etc) in a DynamoDB table by job's id so I can see what's happening in fact. The critical issue I face is that from time to time all the jobs are stuck in the Pending state which means they're not even received by the SQS consumers that set the state immediately to the in-progress state. For some periods all of the requests might be ignored, for others just a part. No errors in the logs.

The current message traffic is about 5 messages per minute. I run the consumer apps on On-Premises servers. Each server-consumer has to process one message at a time, no more.

<aws.java.sdk.version>2.20.43</aws.java.sdk.version>

<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>3.1.0</version>
    @Bean
    public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient){
        return SqsTemplate.builder()
                .sqsAsyncClient(sqsAsyncClient)
                .configureDefaultConverter(converter -> {
                    converter.setPayloadTypeHeaderValueFunction((m) -> null);
                })
                .build();
    }

sqs

Sample

The consumer's definition:

@SqsListener(value = "${requests-queue.endpoint}",
            acknowledgementMode = "ON_SUCCESS",
            maxConcurrentMessages = "1",
            maxMessagesPerPoll = "1"
    )
    public void receive(@Valid JobRequest request) throws IOException, InterruptedException {
        log.info("Received message: " + request);

        service.process(request);
    }

Sarkhad avatar Feb 26 '24 18:02 Sarkhad

Hi @Sarkhad, a few questions:

  1. I use two same consumers (potentially more)

Can you elaborate on what you mean by this? Is it two separate server instances? Or consumers in the same instance?

  1. In the screenshot you shared there are no messages available in the queue. When this happens, can you check if the messages are left in the queue, or if they become in-flight?

  2. Can you set the logs to TRACE in these classes, and share the corresponding logs to when the issue happens?

  • SemaphoreBackPressureHandler
  • AbstractPollingMessageSource

Mind that the logs will be very verbose at this level, so it's probably a good idea to run that in a lower environment.

  1. When this happens, does the app recover by itself (without restarting)?

Thanks.

tomazfernandes avatar Feb 26 '24 23:02 tomazfernandes

1 I run two copies of the message-consuming code. Each copy runs on a separate server. One instance per server. 2 I always see 0 of available messages. Even manual pooling from the AWS console gives no messages.

3 I have set the logging level with: `SPRING_APPLICATION_JSON='{"logging.level.io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource": "TRACE","logging.level.io.awspring.cloud.sqs.listener.SemaphoreBackPressureHandler": "TRACE"}'

For the first try, I pushed to SQS 4 copies of the message, but only 2 of them were completed.

The consumer were repeating the following logs: Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.121Z DEBUG 20307 --- [ecycle-thread-1] i.a.c.s.l.SemaphoreBackPressureHandler : SemaphoreBackPressureHandler created with configuration AUTO and 1 total permits Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.172Z DEBUG 20307 --- [ecycle-thread-2] i.a.c.s.l.s.AbstractPollingMessageSource : Starting StandardSqsMessageSource for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.203Z INFO 20307 --- [ecycle-thread-1] a.c.s.l.AbstractMessageListenerContainer : Container io.awspring.cloud.sqs.sqsListenerEndpointContainer#0 started Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.210Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.211Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 1 Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.216Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.216Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : 1 permits acquired for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.216Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.217Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : 1 permits acquired for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.288Z INFO 20307 --- [ main] r.o.b.BrowserClientApplication : Started BrowserClientApplication in 27.317 seconds (process running for 30.212) Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.644Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.646Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 Feb 27 01:45:16 client.jar[20307]: 2024-02-27T01:45:16.646Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.647Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Not able to acquire 1 permits in 10000 milliseconds for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.649Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : No permits acquired for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.649Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.649Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 Feb 27 01:45:26 client.jar[20307]: 2024-02-27T01:45:26.650Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW Feb 27 01:45:27 client.jar[20307]: 2024-02-27T01:45:27.767Z TRACE 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : 1 permits acquired for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 Feb 27 01:45:27 client.jar[20307]: 2024-02-27T01:45:27.772Z DEBUG 20307 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0

===

On the second try, I left only one consumer turned on and pushed again 4 copies of the message. 3 of 4 got completed. The messages let the consumer hit the cache so the message loss is not related to business logic.

==

On the third try with one consumer, only 1 of 4 is processed. The tracing keeps spamming with the same messages above. Still have no idea what's going on....

== On the fourth, last try. I have disabled all the consumers and have started one locally.

It was repeating something like this, the same as the others: Not able to acquire 1 permits in 10000 milliseconds for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 2024-02-27T06:35:53.757+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : No permits acquired for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.757+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.757+04:00 DEBUG 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 2024-02-27T06:35:53.757+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW 2024-02-27T06:35:53.830+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : 1 permits acquired for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW. Permits left: 0 2024-02-27T06:35:53.830+04:00 TRACE 26756 --- [c-response-9-29] i.a.c.s.l.SemaphoreBackPressureHandler : Released 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 1 2024-02-27T06:35:53.830+04:00 DEBUG 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 2024-02-27T06:35:53.830+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : 1 permits acquired for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.830+04:00 TRACE 26756 --- [c-response-9-29] i.a.c.s.l.s.AbstractPollingMessageSource : Released batch of unused permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.831+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue https://sqs.eu-central-1.amazonaws.com/123456/requests-prod 2024-02-27T06:35:53.831+04:00 DEBUG 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0 2024-02-27T06:35:53.831+04:00 TRACE 26756 --- [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 1 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM LOW

Again pushed 4 copies in a row and only got 1 completed. Other messages just disappeared. No mentions, no exceptions, no appearance in manual pooling, 0 in message available.

"When this happens, does the app recover by itself (without restarting)?" The regular loss of messages has been happening since I developed the part. Just didn't count it seriously at the beginning.

The pseudo code for the service.process(request); `public synchronized void download(Request request) { log.info("blabla1"); log.info("blabla2"); log.info("blabla3");

var job = jobsRepo.get(request.id()); if(job.isEmpty() { log.error("bla-bla") return }

jobsRepo.updateStatus(request.id, State.IN_PROGRESS);

if(hasCached(request.payload)) { jobsRepo.updateStatus(request.id, State.COMPLETED); return; }

//continuing as first time. Never called in the context of the discussion. } `

I used synchronized to make sure the critical resource was not used in parallel otherwise it failed (running Chrome instance through Selenium). And, to be precise, I don't see any "blabla1, blabla2, blabla3" in the logs which means service.process() is not even starting.

P. S. Extra try confirmed that the listener works chaotically. For a while, with no restart, the locally running consumer started to process the messages but after some time started ignoring them again... P. P. S Tried even without that synchronized keyword, no changes.

The issue was sort of resolved when I created and switched to one more SQS queue with the same settings. It needs more time for observation.

Sarkhad avatar Feb 27 '24 02:02 Sarkhad

Hi @Sarkhad.

The logs seem normal - it's acquiring and releasing permits as expected.

Would you mind removing the synchronized block and removing the critical resource from the logic to see if you still have the issue?

Please also set AbstractMessageProcessingPipelineSink logs to TRACE.

Thanks.

tomazfernandes avatar Feb 28 '24 00:02 tomazfernandes

@Sarkhad, I've created an integration test to assert consumption for one message at a time is working as expected.

From the code and logs you provided, it seems very likely you're having an issue due to synchronization.

I'll close this issue since it's been 2 weeks without feedback.

If you can create a sample project the reproduces the issue or provide logs indicating the framework is misbehaving we can reopen it.

Thanks.

tomazfernandes avatar Mar 10 '24 19:03 tomazfernandes