rabbitmq-cdi icon indicating copy to clipboard operation
rabbitmq-cdi copied to clipboard

Lost connection not detected by ConnectListener

Open arjenjobse opened this issue 5 months ago • 1 comments

Our ConnectListener implementation does not act as expeted. We're using rabbitmq-cdi version 2.0.0.

Our EventBinder:

import com.rabbitmq.client.BuiltinExchangeType;

import jakarta.inject.Inject;
import net.reini.rabbitmq.cdi.EventBinder;
import net.reini.rabbitmq.cdi.ExchangeDeclaration;
import nl.stater.commons.interestservicestore.InterestserviceStoreItem;
import nl.stater.interestservicestore.config.encoding.JsonEncoder;

public class RabbitBinder extends EventBinder {
    
    @Inject 
    RabbitConnectionListener listener;

	@Override
	protected void bindEvents() {
		ExchangeDeclaration interestservicStoreExchange = declarerFactory()
				.createExchangeDeclaration("exchangename")
					.withType(BuiltinExchangeType.DIRECT)
					.withAutoDelete(false)
					.withDurable(true);

				bind(InterestserviceStoreItem.class)
					.toExchange("exchangename")
					.withDeclaration(interestservicStoreExchange)
					.withRoutingKey("routingkey")
					.withEncoder(new JsonEncoder<>());
				
				registerConnectionListener(listener);
	}
}

Our Binder Initializer:

import com.rabbitmq.client.Address;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.servlet.ServletContextEvent;
import jakarta.servlet.ServletContextListener;
import jakarta.servlet.annotation.WebListener;
import nl.stater.commons.blueriq.foundation.exception.ApplicationException;

@WebListener
public class ContentstoreConfig implements ServletContextListener {
	
	@Inject
	private RabbitBinder rabbitBinder;

	
	@PostConstruct
	public void initialize() {
		try {
			rabbitBinder.configuration()
			.addHost(new Address("hostname", 5671))
			.setUsername("user")
			.setPassword("pwd")
			.setVirtualHost("virtualhost")
			.setSecure(true)
			.setConnectTimeout(10000)
			.setConnectRetryWaitTime(10000)
			.setRequestedConnectionHeartbeatTimeout(3);
			rabbitBinder.initialize();
		} catch (Exception e) {
			throw new ApplicationException("Inializiation of RabbitMQ Binder failed", e);
		}
	}

	
	@Override
	public void contextInitialized(ServletContextEvent sce) {
	}
	
}

Our ConnectionListener implementation:

import com.rabbitmq.client.Connection;

import jakarta.enterprise.context.ApplicationScoped;
import net.reini.rabbitmq.cdi.ConnectionListener;

@ApplicationScoped
public class RabbitConnectionListener implements ConnectionListener {

	private boolean connected = false;

	@Override
	public void onConnectionEstablished(Connection connection) {
		connected = true;
	}

	@Override
	public void onConnectionLost(Connection connection) {
		connected = false;
	}

	@Override
	public void onConnectionClosed(Connection connection) {
		connected = false;
	}

	public boolean isConnected() {
		return connected;
	}
}

Connecting to RabbitMQ, firing events to Rabbit etc works fine.

At startup of our application, onConnectionEstablished (in our ConnectionListener implementation) is being called. And when shutting down, the onConnectionClosed method.

But when I disrupt the connection between our application and RabbitMQ, I expect the onConnectionLost being called. I tested the disruption by adding a outbound firewall rule with a port block on port 5671. After enabling this rule, logging says:

2024-09-10 10:05:47,457 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (AMQP Connection 172.22.0.195:5671) Caught an exception during connection recovery!: java.net.SocketException: Permission denied: getsockopt
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682)
	at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:542)
	at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:592)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
	at java.base/java.net.Socket.connect(Socket.java:751)
	at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:304)
	at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:61)
	at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:69)
	at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:628)
	at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:589)
	at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.lambda$addAutomaticRecoveryListener$3(AutorecoveringConnection.java:524)
	at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:839)
	at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:816)
	at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:700)
	at java.base/java.lang.Thread.run(Thread.java:1583)

So I ask myself : why does our ConnectionListener implementation not detect that disruption?

Hope you can help us. Many thanks in advance!

arjenjobse avatar Sep 10 '24 08:09 arjenjobse