smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
The JMS connector does not reconnect after disconnections
In my current Quarkus project i'm using Emitter
together with smallrye-reactive-messaging-jms
to send messages to a JMS queue.
It works fine, until the first connectivity issue with the message server (IBM MQ in my case) - after the first attempt to send the message to an unavailable host:port i keep getting java.lang.IllegalStateException: SRMSG00028: The subscription to <queue-name> has been cancelled
.
Is there a way to restart the emitter (get it back into RUNNING
state)? If not, what should be the correct (Quarkus/smallrye) way of handling this kind of issues to build a more resilient system?
I guess the problem is in the JMS connector. The connector is not integrated into Quarkus as it's more a best-effort thing at the moment. It probably does not recover after the loss of connection.
Ok, @cescoffier, thanks for the quick reply!
Note that while the connector does not support reconnecting, the underlying JMS implementation could. Unfortunately I'm not familiar with IBM MQ. The issue I came across recently was with connecting to ActiveMQ and failing for the same reason as described here. Fixed the issue by configuring the ActiveMQ client https://activemq.apache.org/how-can-i-support-auto-reconnection.
It's an interesting feature of ActiveMQ, I couldn't find anything similar for IBM MQ. In our project we ended up creating Proxy
ies capable of detecting the above mentioned connectivity issue (and a couple more) and recreating the connection.
hi @coiouhkc one year later I had the same issue with IBM MQ.
Looking to IBM docs I realised that MQ client actually has an automatic reconnect option:
MQConnectionFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT)
So, putting things together I defined a connection factory (as described in smallrye reactive messaging doc) with MQ client properties :
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.jms.ConnectionFactory;
import com.ibm.msg.client.jakarta.jms.JmsConnectionFactory;
import com.ibm.msg.client.jakarta.jms.JmsFactoryFactory;
import com.ibm.msg.client.jakarta.wmq.WMQConstants;
@ApplicationScoped
public class ConnectionFactoryBean {
@Produces
ConnectionFactory factory() {
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(JmsConstants.JAKARTA_WMQ_PROVIDER);
JmsConnectionFactory factory = ff.createConnectionFactory(); // actually com.ibm.mq.jakarta.jms.MQConnectionFactory
MQConnectionFactory mqFactory = (com.ibm.mq.jakarta.jms.MQConnectionFactory)factory;
mqFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
mqFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT); // <-- enable automatic reconnection
mqFactory.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost");
mqFactory.setIntProperty(WMQConstants.WMQ_PORT, 1414);
mqFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "QM1");
mqFactory.setStringProperty(WMQConstants.WMQ_CHANNEL, "DEV.APP.SVRCONN");
mqFactory.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "MySample");
return mqFactory;
}
}
This way client can automatically recover from a temporary network/server failure.
I hope this can be useful to someone else.
@fsantagostinobietti - i left the project and don't have access to the codebase anymore, so I cannot verify the initial case, but thanks for providing a documented solution!
Is there any chance to integrate this into the connector so it behaves like the smallrye MQTT one? The approach provided by @fsantagostinobietti is suboptimal for the IBM MQ client, since i could not find a way for it to provide logging on connection loss nor when it starts reconnecting / attempts the reconnect nor for providing time intervals (only fixed time) (etc.)