amazon-sqs-java-messaging-lib
amazon-sqs-java-messaging-lib copied to clipboard
Spring Infinity Loop For QueueDoesNotExistException
java.util.concurrent.CompletionException: software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: Sqs, Status Code: 400, Request ID: abf44cc8-bd4a-5abf-85a0-118a756161e8)
at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:170)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
The logs above has been repeated nearly 1 million times in a 25 minutes period due to accidental deletion of the related queue. This caused to max usage of the CPU on the servers (in Elastic Beanstalk) and not receiving the incoming requests.
Our message consuming class is;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(value = "transaction.queue.consumer.enabled")
public class TransactionMessageConsumer {
private final TransactionService transactionService;
private final Tracer tracer;
@SqsListener(value = "${cloud.aws.sqs.transaction-queue}")
public void receive(final TransactionMessageVo transactionMessageVo) {
and Configuration class is;
@Configuration
@ConditionalOnProperty("transaction.queue.publisher.enabled")
public class SQSClientConfiguration {
@Bean
public SqsAsyncClient sqsAsyncClient() {
return SqsAsyncClient.create();
}
@Bean
public SqsTemplate sqsTemplate(final SqsAsyncClient sqsAsyncClient) {
return SqsTemplate.newTemplate(sqsAsyncClient);
}
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(final SqsAsyncClient sqsAsyncClient) {
return SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsAsyncClient)
.build();
}
}
As you can see, we have a simple configuration. We have tried to find a solution to this problem (to slow down polling rate on failure), we could not succeeded.
I think the issue is on this code as it do not consider alike situations and runs infinitely;
AbstractPollingMessageSource.java::191
private void pollAndEmitMessages() {
while (isRunning()) {
try {
if (!isRunning()) {
continue;
}
logger.trace("Requesting permits for queue {}", this.pollingEndpointName);
final int acquiredPermits = this.backPressureHandler.requestBatch();
if (acquiredPermits == 0) {
logger.trace("No permits acquired for queue {}", this.pollingEndpointName);
continue;
}
logger.trace("{} permits acquired for queue {}", acquiredPermits, this.pollingEndpointName);
if (!isRunning()) {
logger.debug("MessageSource was stopped after permits where acquired. Returning {} permits",
acquiredPermits);
this.backPressureHandler.release(acquiredPermits);
continue;
}
// @formatter:off
managePollingFuture(doPollForMessages(acquiredPermits))
.exceptionally(this::handlePollingException)
.thenApply(msgs -> releaseUnusedPermits(acquiredPermits, msgs))
.thenApply(this::convertMessages)
.thenCompose(this::emitMessagesToPipeline)
.exceptionally(this::handleSinkException);
// @formatter:on
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(
"MessageSource thread interrupted for endpoint " + this.pollingEndpointName, e);
}
catch (Exception e) {
logger.error("Error in MessageSource for queue {}. Resuming", this.pollingEndpointName, e);
}
}
logger.debug("Execution thread stopped for queue {}", this.pollingEndpointName);
}
I do have the very same problem, did you find a workaround?
using .autoStartup(enabled) worked for me
Same infinite polling loop with authorization issues:
io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource - Error polling for messages in queue https://sqs.eu-central-1.amazonaws.com/.../queue.fifo
java.util.concurrent.CompletionException: software.amazon.awssdk.services.sqs.model.SqsException: The security token included in the request is invalid. (Service: Sqs, Status Code: 403, Request ID: 6f564cea-...)
at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)
or
io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource - Error polling for messages in queue https://sqs.eu-central-1.amazonaws.com/.../queue.fifo
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to load credentials from any of the providers in the chain AwsCredentialsProviderChain(credentialsProviders=[SystemPropertyCredentialsProvider(), EnvironmentVariableCredentialsProvider(), WebIdentityTokenCredentialsProvider(), ProfileCredentialsProvider(profileName=default, profileFile=ProfileFile(sections=[])), ContainerCredentialsProvider
We also found no way to configure a back off.