amazon-sqs-java-messaging-lib
amazon-sqs-java-messaging-lib copied to clipboard
Does AWS JMS library work well with Spring JMS Library?
I am using aws jms library with spring, versions are shown below.
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.4-RELEASE</version>
</dependency>
In my spring application context, the definition of my listener is as shown below (simplified).
<bean id="credentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<!-- Connect via a Proxy -->
<property name="proxyHost" value="${png.http.proxy.host}"/>
<property name="proxyPort" value="${png.http.proxy.port}" />
<property name="maxConnections" value="${png.sqs.connections.consumer.pool.size}"/>
</bean>
<bean id="connectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder">
<property name="regionName" value="${png.aws.region}"/>
<property name="awsCredentialsProvider" ref="credentialsProviderBean"/>
<property name="clientConfiguration" ref="clientConfiguration"/>
</bean>
<bean id="ConnectionFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"
factory-bean="connectionFactoryBuilder"
factory-method="build" />
<bean id="processorListener" class="com.intuit.mobile.png.notification.processor.ProcessorListener" init-method="init"/>
<jms:listener-container container-type="default" connection-factory="ConnectionFactory" acknowledge="auto"
concurrency="${png.sqs.connections.consumer.pool.size}">
<jms:listener destination="${png.processor.queue.normal}" ref="processorListener" method="onMessage" />
</jms:listener-container>
We are noticing multiple delivery of messages to consumers running on the same instance, separated by milliseconds apart. I understand that the contract is at-least once and there are cases where duplicate delivery can occur...but can that happen in such short intervals.
Basically what we are seeing is message duplication happening at really short intervals....where one message gets multiplied into 2 or 3 sometimes.
Some questions that comes to mind is..
- do you support only once delivery guarantees of JMS?
- can the issue described earlier (message duplication) happen so frequently with AWS JMS library?
- do you recommend the use of spring jms libraries with aws jms? Is this a tested pattern?
- Do you account for visibility timeouts and making sure that the message gets returned to the pool if it waits too long in the cache etc.? What is the cutoff time to call onMessage...ie. if the message lives in the cache for 25 seconds and my visibility timeout is 30 seconds, do you still invoke onMessage() when only 5 seconds is left for the visisbility timeout expiration? Can you share some details around this implementation so that we can handle the border conditions?
just as an fyi, you may be waiting on an answer for this until who knows when. there has not been a lot of activity in this project since it was initially released and i don't know if anyone is really paying any attention.
i only mention this b/c i filed #8 and have heard nothing but crickets.
The answer is yes, you can get duplicate messages. It's hidden away in the faqs. https://aws.amazon.com/sqs/faqs/
Q: How many times will I receive each message? Amazon SQS is engineered to provide “at least once” delivery of all messages in its queues. Although most of the time each message will be delivered to your application exactly once, you should design your system so that processing a message more than once does not create any errors or inconsistencies.
lol at getting an immediate answer - not it issue track but what's the status of this project seeing some love?
After analyzing the thread dump, it does not seem like there any locking issue. All of the threads are simply waiting on a message from SQS queue, which seems to imply there were no more message available in the queue.
Spring JMS is supported. In fact, any framework relying on JMS is theoretically supported, as this library implements a standard interface.
Spring JMS uses the synchronous JMS API of this library. Prefetching messages does not have any mechanism of extending the visibilitytimeout, so prefetching a lot of messages with clogged up consumers might mean you will see duplicates as the timeout expires.
Opened a case with AWS and received a response on some of the questions. attaching it here for everyone's reference.
- once and only once delivery is not supported. At-least once delivery pattern is derived from SQS within the library.
- Yes, SQS behavior of at-least-once is not altered, you still might see duplicate messages being delivered. The library itself does not generate any extra duplicates, nor does it remove them -> the behavior is the same as if using plain vanilla SQS APIs.
- Since we support the JMS specification, you can safely use spring jms framework.
- In case of the synchronous receiver, the prefetched messages (the count of which you can configure through the SQSConnectionFactory$Builder bean) are queued up for the receive calls and their visibility timeout is not adjusted as they wait. In your case, since you do not specify a custom prefetch count, up to a single message is waiting for the receive call on the SQSMessageConsumer.
Since we support the JMS specification, you can safely use spring jms framework.
Um... right. Unfortunately, you will run into performance and functionality issues if you try and do this. Having tested Spring jmsTemplate using MessageListener and Producer code that works flawlessly with other JMS providers, I removed Spring JMS from the stack when using SQS. Under the hood, this library is not JMS and the code in Spring JMS cant handle the custom client ack that this library supports (although you can provide wrapper classes to work around that). Also I found that JMSTemplate was about 2-3X slower to post messages to an SQS queue that using a plain MessageProvider.
Marking as feature request, to test more closely with Spring JMS.
Quick update on this thread:
Our team worked closely with SQS team and determined that there is a bug in the SQS JMS library where close() methods on the Message Listener takes a long time to release resources within the SQS JMS client. When everything is running fine, Spring DefaultMessageListenerContainer works flawlessly with SQS JMS libs... but when there are issues such as SQS throttling (which is hard to believe, but it did happen to us when we had a noisy neighbor in our SQS clusters) or some other network issues....SQS JMS libraries doesn't recover fast enough. Due to this, we ran into hung consumers with Springs JMS libraries which internally uses SQS JMS libs.
Since the JMS libraries are not aggressively maintained and since we don't mind the duplicate delivery, we decided to go native by removing all the JMS wrappers (Spring JMS and SQS JMS libraries) and are using async message consumption wrappers that will use AWS SQS SDK directly. This is working very well for us so far and gives a lot of control on the consumer code. Also we have seen from our tests that we can scale much better, the code is more resilient to failures at the SQS layer (throttling, network issues, timeouts) etc.
@kuba-aws any update on fixing the issue with close() methods in SQS JMS lib? We are facing a similar issue where SQS returns a HTTP 500, which results in blocked consumers. Only solution is to restart the consumer process.
Stacktrace of the exception that causes the issue.
com.amazonaws.services.sqs.model.AmazonSQSException: We encountered an internal error. Please try again. (Service: AmazonSQS; Status Code: 500; Error Code: InternalError; Request ID: 0436cf79-13c2-5432-9b57-9eaffd70a25b) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1579) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1249) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:1689) at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1665) at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:762) at com.amazon.sqs.javamessaging.AmazonSQSExtendedClientBase.deleteMessage(AmazonSQSExtendedClientBase.java:266) at com.amazon.sqs.javamessaging.AmazonSQSExtendedClient.deleteMessage(AmazonSQSExtendedClient.java:558) at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:141) at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:47) at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:56) at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:491) at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:424) at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:171) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:412) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:298) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:251) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1164)
We too have started to encounter this at heavy workloads with high concurrency settings and prefetching whenever SQS returns 500s (which can just happen from time-to-time). @sinusekhar it seems like one simple fix would be to update SQSMessageConsumerPrefetch.cannotDeliver() to throw a JmsException instead of returning true. This leads to the "greedy infinite" looping that hogs all resources and doesn't let the consumer stop processing for cleanup to finish.
EDIT: After digging further, we have found that if a SQSconnection is closed before the receiveMessage is invoked (but not during) this generally reproduces the hang.
We hit this same issue (blocked busy threads) on two different servers in the last couple of days. Stacktrace of the error that appeared to trigger the condition -
01:06:08.768 [gatewayMessageListenerContainer-7] ERROR com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper - AmazonServiceException: deleteMessage. RequestId: 2c564b4f-7c59-587e-a70b-1959ab19a3dc
HTTPStatusCode: 500 AmazonErrorCode: InternalError
com.amazonaws.AmazonServiceException: We encountered an internal error. Please try again. (Service: AmazonSQS; Status Code: 500; Error Code: InternalError; Request ID: 2c564b4f-7c59-587e-a70b-1959ab19a3dc)
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1378) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:924) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:702) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:454) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:416) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:365) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:1741) ~[aws-java-sdk-sqs-1.11.18.jar!/:?]
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1711) ~[aws-java-sdk-sqs-1.11.18.jar!/:?]
at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:792) ~[aws-java-sdk-sqs-1.11.18.jar!/:?]
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:141) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:47) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:56) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:491) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:424) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:171) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
Have since upgraded to 1.0.4 of the library in the hope that it may have been fixed, but looking at the change logs and the above posts, it appears unlikely.
@daveyoungbcat I can confirm it has not been fixed in 1.0.4. Those are precisely the same errors we get. With our SQS message volume and configuration, we generally encounter this no less than every 8-12 hours.
Upon closer inspection, here's what is happening.
Preconditions to reproduce:
- Spring JMS Concurrency level > 1
- Spring JMS Cache level >=
CACHE_CONNECTION
Steps to reproduce:
- Multiple consumers are spawned that reuse a shared connection
- In one consumer the
AmazonSQSClientthrows an exception (typically during adeleterequest, but can be anything really) - The Spring JMS
AsyncMessageListenerInvokerhandles this first failure when using shared connections (based on cache level) by refreshing the shared connection (i.e. closing it and opening a new one). - Closing the shared connection causes all open sessions against that connection to be closed. This will cause all of the
SQSMessageConsumers executing against those sessions to be closed, closing their wrappedSQSMessageConsumerPrefetchinstances. - The original consumer thread handling the failed request dies and the
AsyncMessageListenerInvokerthread completes. - Based on concurrency settings, a new consumer will be spawned. The new consumer will use the new shared connection and a new session appropriately.
- All previously running consumers that reused the initial connection are exposed to a race condition:
- If the consumer is processing a message against the closed session and invokes any method against the client (such as acknowledge, or delete) for a different message, an exception is thrown and the consumer thread dies (closing the already closed session), but most importantly it does not attempt to recover the shared connection. In this situation the recovery marker has not changed as a new connection or session were not created. This consumer thread dies, and a new consumer is spawned correctly to take its place.
- If the consumer was idle, this causes the problem. When this consumer executes its next
receive()call it will entercannotDeliver(), check the state of the consumer, see that it is closed, and will returnnullcausing for theexecuteOngoingLoop()to continue to loop. This results in the busy/hung thread. This consumer has no way to recover or be expelled from the consumer pool.
I may be mistaken, but I believe the simplest and clearest fix is to still throw an exception from the cannotDeliver whenever the consumer has been closed.
I have opened PR https://github.com/awslabs/amazon-sqs-java-messaging-lib/pull/41 accordingly.
I've merged PR #41 and it has been released in 1.0.8. There are a lot of testimonials on the PR about the change fixing things for various people, but I'm hoping for confirmation here before I close the issue itself.
@robin-aws At least the problem we have in production still occur with v1.0.8 (Spring-boot 1.5, spring-cloud-aws-messaging:2.1).
java.lang.IllegalStateException: Connection pool shut down
at org.apache.http.util.Asserts.check(Asserts.java:34)
at org.apache.http.pool.AbstractConnPool.lease(AbstractConnPool.java:191)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:268)
at sun.reflect.GeneratedMethodAccessor96.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
at com.amazonaws.http.conn.$Proxy103.requestConnection(Unknown Source)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2214)
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2181)
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2170)
at com.amazonaws.services.sqs.AmazonSQSClient.executeReceiveMessage(AmazonSQSClient.java:1607)
at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1578)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:351)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:259)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessagesWithBackoff(SQSMessageConsumerPrefetch.java:303)
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
spring-cloud-aws-messaging
@SchulteMarkus Any work around you found?
@dhirenpratap No. In the end, we decided to suppress this specific error.