solace-spring-cloud icon indicating copy to clipboard operation
solace-spring-cloud copied to clipboard

How to handle the InterruptedException properly

Open IagoSchlisting opened this issue 1 year ago • 1 comments

Hi colleagues,

I would like to know if there is a way to reprocess consumed messages that fails because of the InterruptedException error.

More details about our scenario: Our application scales automatically. Frequently, when it descales, the InterruptedException is thrown since the instance is shut down during the processing of one or more messages. The big problem is that these messages are not reprocessed automatically as it should and we lose it. Do you have any idea on how we can reprocess messages in this scenario?

Log error:

Caused by: com.solacesystems.jcsmp.JCSMPInterruptedException: Message enqueue interrupted
	at com.solacesystems.jcsmp.impl.PubADManager.enqueueMsgWithIdUpdateWithThrows(PubADManager.java:939)
	at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.updateAdMsgIDandEnqueueToWindow(JCSMPXMLMessageProducer.java:1475)
	at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.sendMsgOnce(JCSMPXMLMessageProducer.java:1247)
	at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.sendImpl(JCSMPXMLMessageProducer.java:1139)
	at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.sendToDestination(JCSMPXMLMessageProducer.java:932)
	at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.sendToDestination(JCSMPXMLMessageProducer.java:869)
	at com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.send(JCSMPXMLMessageProducer.java:687)
	at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessage(JCSMPOutboundMessageHandler.java:102)
	... 61 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
	at com.solacesystems.jcsmp.impl.queues.ConditionalBoundedMessageQueue.queueMsgWithIdUpdate(ConditionalBoundedMessageQueue.java:254)
	at com.solacesystems.jcsmp.impl.PubADManager.enqueueMsgWithIdUpdateWithThrows(PubADManager.java:934)
	... 68 more

I'm looking forward to your return. Regards,

IagoSchlisting avatar May 30 '23 20:05 IagoSchlisting

If it can help you, this is our current consumer queue configuration:

spring.cloud.stream.bindings.inputChannel.binder=solace
spring.cloud.stream.bindings.inputChannel.destination=<destination_name>
spring.cloud.stream.bindings.inputChannel.group=<group_name>
spring.cloud.stream.bindings.inputChannel.consumer.max-attempts=1
spring.cloud.stream.solace.bindings.inputChannel.consumer.queueNameExpression=destination.trim()
spring.cloud.stream.solace.bindings.inputChannel.consumer.errorQueueNameExpression=destination.trim() + '.error-queue'
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=50
spring.cloud.stream.solace.bindings.inputChannel.consumer.autoBindErrorQueue=true
spring.cloud.stream.solace.bindings.inputChannel.consumer.provisionDurableQueue=false
spring.cloud.stream.solace.bindings.inputChannel.consumer.addDestinationAsSubscriptionToQueue=false
spring.cloud.stream.solace.bindings.inputChannel.consumer.errorMsgDmqEligible=true
spring.cloud.stream.solace.bindings.inputChannel.consumer.provisionErrorQueue=false

IagoSchlisting avatar May 30 '23 20:05 IagoSchlisting