spring-cloud-aws
spring-cloud-aws copied to clipboard
Release Polling Permits on Conversion Errors
Type: Bug
Component: SQS
Describe the bug Spring Cloud AWS 3.1.0.
The message poller stops after trying to process some messages from a queue when they result in a "No class found" error during deserialization, causing messages not to be sent to DLQ after "maximum receives".
Scenario: Class that represents the message was moved to a different package and there are still old messages to be processed by new service version.
Related logs:
i.a.c.s.l.s.AbstractPollingMessageSource : Error processing message
java.lang.IllegalArgumentException: No class found with name com.sample.old.TestObject
Sample
- Create a record TestObject:
package com.sample.new;
public record TestObject(
String name
) {
}
- Create a simple SQS listener:
@SqsListener(value = {"test-type-queue"}, acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL)
public void processMessage(@Valid TestObject message, Acknowledgement acknowledgment) {
// process message
acknowledgment.acknowledge();
}
- Post 2 messages to the queue:
{"name":"custom name"}
, withJavaType
header:com.sample.old.TestObject
.
Hi @rerodrig.
Not sure what you mean. Are you saying that after these two errors, no new messages are polled from the queue?
Can you share the full stack trace with the error?
Thanks.
Hi @tomazfernandes,
After some errors for both messages, no new messages are polled from the queue.
Here is the stack trace:
2024-02-15T17:09:46.180Z ERROR 19269 --- [test-sqs] [c-response-5-12] i.a.c.s.l.s.AbstractPollingMessageSource : Error processing message
java.lang.IllegalArgumentException: No class found with name com.sample.old.TestObject
at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.defaultHeaderTypeMapping(AbstractMessagingMessageConverter.java:197)
at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.getTargetType(AbstractMessagingMessageConverter.java:179)
at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.convertPayload(AbstractMessagingMessageConverter.java:170)
at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.toMessagingMessage(AbstractMessagingMessageConverter.java:153)
at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessage(AbstractMessageConvertingMessageSource.java:91)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessages(AbstractMessageConvertingMessageSource.java:85)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:58)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$execute$0(MakeAsyncHttpRequestStage.java:110)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:253)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:167)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
After 10 attempts to process the messages (5 for each message), they stay in the queue and the following logs are displayed:
2024-02-15T17:09:46.970Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:47.975Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:48.976Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:49.980Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:50.985Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:51.985Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:52.989Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:53.994Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:54.995Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:55.592Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Not able to acquire 10 permits in 10000 milliseconds for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM HIGH. Permits left: 0
2024-02-15T17:09:55.592Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : No permits acquired for queue test-type-queue
2024-02-15T17:09:55.592Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue test-type-queue
2024-02-15T17:09:55.592Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 10 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM HIGH
2024-02-15T17:09:55.999Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:56.999Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:58.000Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:59.003Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:00.007Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:01.009Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:02.014Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:03.015Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:04.019Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:05.024Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:05.597Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Not able to acquire 10 permits in 10000 milliseconds for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM HIGH. Permits left: 0
2024-02-15T17:10:05.597Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : No permits acquired for queue test-type-queue
2024-02-15T17:10:05.597Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue test-type-queue
2024-02-15T17:10:05.597Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquiring 10 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM HIGH
Thanks.
Yup, that's a bug @rerodrig, thanks for reporting and for the detailed logs!
When message conversion throws an error the framework does not release the permits so eventually it runs out.
I have an acknowledgement issue in the pipeline to fix that seems more urgent and should be able to look into this soon enough.
Thanks.
Hey @tomazfernandes, curious if there are any updates on this? I think this bug is something my team has observed as well. Thanks!
Hi @liwamdaman, no updates,I haven't had the time so work on this yet.
If anyone would like to submit a PR with a fix I'd be happy to review and merge it.
Opened this PR: https://github.com/awspring/spring-cloud-aws/pull/1090/files
Reviews are welcome.