rabbitmq-cdi
rabbitmq-cdi copied to clipboard
Lost connection not detected by ConnectListener
Our ConnectListener implementation does not act as expeted. We're using rabbitmq-cdi version 2.0.0.
Our EventBinder:
import com.rabbitmq.client.BuiltinExchangeType;
import jakarta.inject.Inject;
import net.reini.rabbitmq.cdi.EventBinder;
import net.reini.rabbitmq.cdi.ExchangeDeclaration;
import nl.stater.commons.interestservicestore.InterestserviceStoreItem;
import nl.stater.interestservicestore.config.encoding.JsonEncoder;
public class RabbitBinder extends EventBinder {
@Inject
RabbitConnectionListener listener;
@Override
protected void bindEvents() {
ExchangeDeclaration interestservicStoreExchange = declarerFactory()
.createExchangeDeclaration("exchangename")
.withType(BuiltinExchangeType.DIRECT)
.withAutoDelete(false)
.withDurable(true);
bind(InterestserviceStoreItem.class)
.toExchange("exchangename")
.withDeclaration(interestservicStoreExchange)
.withRoutingKey("routingkey")
.withEncoder(new JsonEncoder<>());
registerConnectionListener(listener);
}
}
Our Binder Initializer:
import com.rabbitmq.client.Address;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.servlet.ServletContextEvent;
import jakarta.servlet.ServletContextListener;
import jakarta.servlet.annotation.WebListener;
import nl.stater.commons.blueriq.foundation.exception.ApplicationException;
@WebListener
public class ContentstoreConfig implements ServletContextListener {
@Inject
private RabbitBinder rabbitBinder;
@PostConstruct
public void initialize() {
try {
rabbitBinder.configuration()
.addHost(new Address("hostname", 5671))
.setUsername("user")
.setPassword("pwd")
.setVirtualHost("virtualhost")
.setSecure(true)
.setConnectTimeout(10000)
.setConnectRetryWaitTime(10000)
.setRequestedConnectionHeartbeatTimeout(3);
rabbitBinder.initialize();
} catch (Exception e) {
throw new ApplicationException("Inializiation of RabbitMQ Binder failed", e);
}
}
@Override
public void contextInitialized(ServletContextEvent sce) {
}
}
Our ConnectionListener implementation:
import com.rabbitmq.client.Connection;
import jakarta.enterprise.context.ApplicationScoped;
import net.reini.rabbitmq.cdi.ConnectionListener;
@ApplicationScoped
public class RabbitConnectionListener implements ConnectionListener {
private boolean connected = false;
@Override
public void onConnectionEstablished(Connection connection) {
connected = true;
}
@Override
public void onConnectionLost(Connection connection) {
connected = false;
}
@Override
public void onConnectionClosed(Connection connection) {
connected = false;
}
public boolean isConnected() {
return connected;
}
}
Connecting to RabbitMQ, firing events to Rabbit etc works fine.
At startup of our application, onConnectionEstablished (in our ConnectionListener implementation) is being called. And when shutting down, the onConnectionClosed method.
But when I disrupt the connection between our application and RabbitMQ, I expect the onConnectionLost being called. I tested the disruption by adding a outbound firewall rule with a port block on port 5671. After enabling this rule, logging says:
2024-09-10 10:05:47,457 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (AMQP Connection 172.22.0.195:5671) Caught an exception during connection recovery!: java.net.SocketException: Permission denied: getsockopt
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682)
at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:542)
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:592)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
at java.base/java.net.Socket.connect(Socket.java:751)
at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:304)
at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:61)
at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:69)
at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:628)
at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:589)
at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.lambda$addAutomaticRecoveryListener$3(AutorecoveringConnection.java:524)
at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:839)
at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:816)
at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:700)
at java.base/java.lang.Thread.run(Thread.java:1583)
So I ask myself : why does our ConnectionListener implementation not detect that disruption?
Hope you can help us. Many thanks in advance!