paho.mqtt.java icon indicating copy to clipboard operation
paho.mqtt.java copied to clipboard

MQTT Client Stops Receiving the Messages From Broker.

Open sanket-kulkarni-vmware opened this issue 8 years ago • 17 comments

MQTT Paho Client stops getting the messages from the broker at some point.

The client is receiving the messages from the broker at the rate of 5000 msg/sec and the message size of each message is around 1 KB.

At broker side, I have seen that client is connected with the broker but somehow broker unable to send messages to the client.

Following is the code I am using to connect broker.

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttTopic;

import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

public class Test_client implements MqttCallbackExtended{
   public static AtomicLong count = new AtomicLong();
   
   String topic        = "mqtt/test/#";
   String broker       = "tcp://broker-ip:1883";
   String clientId     = "client-id-1";
   MqttClient client;
   MqttConnectOptions connOpt;

   
   
   public void connect(){
	   	 try {
	   		 
	   		 System.out.println("Connecting to the broker..");
	   		 this.client = new MqttClient(broker, clientId);
		     this.connOpt = new MqttConnectOptions();
		     
		     // MQTT Configurations
		     //this.connOpt.setMqttVersion(4);
		     this.connOpt.setCleanSession(true);
		     this.connOpt.setKeepAliveInterval(30);
		     this.connOpt.setAutomaticReconnect(true);
		     this.connOpt.setUserName("xyz");
		     this.connOpt.setPassword("abcd".toCharArray());
		     
		     // Specifying the callback
		     this.client.setCallback(this);
		     
		     // Subscriber
		     this.client.connect(connOpt);
		     this.client.subscribe(topic, 1);
		     System.out.println("Connected to the broker.. ");
	   	 }
	   	 catch(Exception error){
	   		System.out.println("Exception occured while connecting to broker");
	   	 }
   }

   
   /**
	 * 
	 * connectionLost
	 * This callback is invoked upon losing the MQTT connection.
	 * 
	 */
	@Override
	public void connectionLost(Throwable t) {
		System.out.println("Connection lost!");
	}

	/**
	 * 
	 * deliveryComplete
	 * This callback is invoked when a message published by this client
	 * is successfully received by the broker.
	 * 
	 */
	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		//System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
		
	}
	
	//@Override
	public void connectComplete(boolean reconnect, java.lang.String serverURI){
		System.out.println("Re-Connection Attempt " + reconnect);
		if(reconnect) {
			try {
				this.client.subscribe(topic, 1);
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

	/**
	 * 
	 * messageArrived
	 * This callback is invoked when a message is received on a subscribed topic.
	 * 
	 */
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		/*
		System.out.println("-------------------------------------------------");
		System.out.println("| Topic:" + topic);
		System.out.println("| Message: " + new String(message.getPayload()));
		System.out.println("-------------------------------------------------");
		*/
		
		count.incrementAndGet();
	}
	
	
	public static void main(String args[]) throws InterruptedException {
	      Test_client client_mqtt = new Test_client();
	      client_mqtt.connect();
	      
	      while(true){        	 
         	 Date date = new Date();
         	 System.out.println(date.toString() + ": Total Messages Count: " + Test_client.count.get());
         	 Thread.sleep(10000);
          }
	      
	}

}

sanket-kulkarni-vmware avatar Jan 25 '17 09:01 sanket-kulkarni-vmware

By default, the number of allowed maximum inflight messages is 10, you could change this by using MqttConnectOptions.setMaxInflight to change this to a higher value. https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html#setMaxInflight-int-

If that doesn't help, some more debug information might be useful e.g. any error logs, the numbers / timings at which point messages are no longer received etc.. You could also turn on Logging to get some more useful information out of the client: https://wiki.eclipse.org/Paho/Log_and_Debug_in_the_Java_client

jpwsutton avatar Jan 25 '17 10:01 jpwsutton

@jpwsutton But as per the documentation of getMaxInflight() method, it will affect only on messages we send to the broker and not on the messages we are receiving from the broker. Or it applies to both?

sanket-kulkarni-vmware avatar Jan 25 '17 10:01 sanket-kulkarni-vmware

@sanket-kulkarni-vmware Sorry, I wasn't thinking when I sent that, yes max in flight is only for outgoing messages.

However I tried recreating this using your code (with a tweak to count the number of messages received per second.

Connecting to the broker..
Re-Connection Attempt false
Connected to the broker.. 
Wed Jan 25 10:55:35 GMT 2017: Total Messages Count: 0 Per Second: 0
Wed Jan 25 10:55:36 GMT 2017: Total Messages Count: 7310 Per Second: 7310
Wed Jan 25 10:55:37 GMT 2017: Total Messages Count: 16752 Per Second: 9442
Wed Jan 25 10:55:38 GMT 2017: Total Messages Count: 26906 Per Second: 10154
Wed Jan 25 10:55:40 GMT 2017: Total Messages Count: 37557 Per Second: 10651
Wed Jan 25 10:55:41 GMT 2017: Total Messages Count: 59803 Per Second: 22246
Wed Jan 25 10:55:42 GMT 2017: Total Messages Count: 74110 Per Second: 14307
Wed Jan 25 10:55:43 GMT 2017: Total Messages Count: 87575 Per Second: 13465
Wed Jan 25 10:55:44 GMT 2017: Total Messages Count: 101366 Per Second: 13791
Wed Jan 25 10:55:45 GMT 2017: Total Messages Count: 115313 Per Second: 13947
Wed Jan 25 10:55:46 GMT 2017: Total Messages Count: 129380 Per Second: 14067
Wed Jan 25 10:55:47 GMT 2017: Total Messages Count: 143177 Per Second: 13797
Wed Jan 25 10:55:48 GMT 2017: Total Messages Count: 157223 Per Second: 14046
Wed Jan 25 10:55:49 GMT 2017: Total Messages Count: 170881 Per Second: 13658
Wed Jan 25 10:55:50 GMT 2017: Total Messages Count: 185866 Per Second: 14985
Wed Jan 25 10:55:51 GMT 2017: Total Messages Count: 199796 Per Second: 13930
Wed Jan 25 10:55:52 GMT 2017: Total Messages Count: 213874 Per Second: 14078
Wed Jan 25 10:55:53 GMT 2017: Total Messages Count: 228665 Per Second: 14791
Wed Jan 25 10:55:54 GMT 2017: Total Messages Count: 244013 Per Second: 15348
Wed Jan 25 10:55:55 GMT 2017: Total Messages Count: 258673 Per Second: 14660

On the same machine I had another java app running that was sending 1KB messages as fast as it could (way above 5000 per second).

As you can see, the subscriber is able to receive over 10,000 messages per second. How long does it take for your code in MessageArrvied to process the incoming message? MessageArrvied is called synchronously by the client and so an acknowledgment is not sent back to the server until this method returns cleanly. This could be the cause of the delay...

jpwsutton avatar Jan 25 '17 11:01 jpwsutton

@jpwsutton: I am using same above code for the test, I haven't put any addition code in messageArrived().

sanket-kulkarni-vmware avatar Jan 25 '17 11:01 sanket-kulkarni-vmware

@jpwsutton I am getting following log from Paho Library

Jan 26, 2017 3:34:56 PM org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
SEVERE: client-5: Timed out as no activity, keepAlive=30,000 lastOutboundActivity=1,485,425,066,330 lastInboundActivity=1,485,425,043,580 time=1,485,425,096,330 lastPing=1,485,425,066,330

Not sure broker dropping the connection or paho client.

sanket-kulkarni-vmware avatar Jan 26 '17 10:01 sanket-kulkarni-vmware

Any feedback on this? Having the connection to the broker lock up is not great.

gpampara avatar Jul 25 '17 06:07 gpampara

Does anyone have success with this issue?

AlexMAS avatar Jul 24 '18 05:07 AlexMAS

I'm still unable to recreate this issue, is anyone still experiencing it? If so, it would be really handy to get some more information about the broker / version that you are using.

jpwsutton avatar Aug 28 '18 19:08 jpwsutton

To me, this thing happens when the client loss the network connection. To be precise, I have an embedded pc running on a local network having an acces to the company wan. The mqtt client works fine but, if i disconnect even for some seconds the router from the wan, I get the output "Jan 26, 2017 3:34:56 PM org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity SEVERE: client-5: Timed out as no activity..." after the keepAlive timeout expires, but the connectionLost callback (which I use to try reconnection) is not called. The system is in a sort of suspended state, in which it doesn't receive any message from broker, but still doesn't understand it's disconected.

lucapaciello avatar Sep 24 '18 15:09 lucapaciello

I believe I am experiencing this issue. If I speed up my on_message callback function, there is no problem and everything works as intended.

serenegrace avatar Oct 04 '18 20:10 serenegrace

I have the same problem of @lucapaciello : the client "freez" in a state where doesn't recieve messages from the broker but the connection lost event is never fired, so the client doesn't try to recnnect :/

micnori avatar Jan 11 '19 13:01 micnori

I experienced this problem recently and as metioned above a loss of network is a way to reproduce it,may not be the only way though.Client will stay unavailable but wont trigger any connection-lost callbacks or any exception once the problem occurs.Problem remains unsovled. GID_newbest@@@local_nbcloud570: Timed out as no activity, keepAlive=10,000,000,000 lastOutboundActivity=5,536,654,054,884 lastInboundActivity=5,526,652,696,463 time=5,546,658,193,643 lastPing=5,536,654,061,117

18616505009 avatar Sep 05 '19 02:09 18616505009

It can happens when your internet connection is lost and keepAlive has been timed out if this is your problem i fix it in this PR: https://github.com/eclipse/paho.mqtt.java/pull/702

siavashsoleymani avatar Sep 20 '19 08:09 siavashsoleymani

We are experiencing this issue on production. Our workaround is to have a custom health check on the connectivity so that we can restart the pod when the problem is detected.

arroyoh avatar Nov 04 '20 13:11 arroyoh

We are experiencing this issue on production. Our workaround is to have a custom health check on the connectivity so that we can restart the pod when the problem is detected.

can you post a piece of code?

acaliaro avatar Nov 04 '20 13:11 acaliaro

We are experiencing this issue on production. Our workaround is to have a custom health check on the connectivity so that we can restart the pod when the problem is detected.

can you post a piece of code?

our conn opts

============== Connection options ==============
SocketFactory               :  null
CleanSession                :  false
MqttVersion                 :  4
KeepAliveInterval           :  30
ConTimeout                  :  30
UserName                    :  mqtt-client
SSLProperties               :  null
WillDestination             :  null
==========================================

The workaround is pretty basic, but suits our use case. We count the number of DCs using connectionLost callback. When the issue happens our client gets DC'ed every 60 seconds due to inactivity.

override fun connectionLost(cause: Throwable?) {
    log.info("MQTT client connection lost: ${cause?.message}")
    updateConnectivityStatus()
}

arroyoh avatar Nov 04 '20 13:11 arroyoh

I have the same problem of @sanket-kulkarni-vmware

  • our connect codes
try {
            String[] serverURIs = parseServerUri(mqttClientConfig.getBrokerAddress());
            MqttDefaultFilePersistence mqttDefaultFilePersistence = null;
            if (EmptyUtils.isNotEmpty(mqttClientConfig.getMqttFilePersisDir())) {
                mqttDefaultFilePersistence = new MqttDefaultFilePersistence(mqttClientConfig.getMqttFilePersisDir());
            } else {
                mqttDefaultFilePersistence = new MqttDefaultFilePersistence();
            }
            mqttclient = new MqttClient(serverURIs[0], genMqttClientId(mqttClientConfig.getClientIdPrefix()), mqttDefaultFilePersistence);
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            if (!StringUtils.isEmpty(mqttClientConfig.getUsername()) && !StringUtils.isEmpty(mqttClientConfig.getPassword())) {
                mqttConnectOptions.setUserName(mqttClientConfig.getUsername());
                mqttConnectOptions.setPassword(mqttClientConfig.getPassword().toCharArray());
            }
            if (serverURIs.length > 1) {
                mqttConnectOptions.setServerURIs(serverURIs);
            }
            mqttConnectOptions.setCleanSession(true);
            mqttConnectOptions.setConnectionTimeout(mqttClientConfig.getTimeOut());
            mqttConnectOptions.setKeepAliveInterval(mqttClientConfig.getKeepAliveInterval());
            mqttConnectOptions.setAutomaticReconnect(mqttClientConfig.isAutomaticReconnection());
            if (!StringUtils.isEmpty(mqttClientConfig.getWillTopic()) && !StringUtils.isEmpty(mqttClientConfig.getWillPayload())) {
                mqttConnectOptions.setWill(mqttClientConfig.getWillTopic(), mqttClientConfig.getWillPayload().getBytes(), QosType.QOS_EXACTLY_ONCE.type(), false);
            }
            mqttclient.setCallback(this);
            if (!isConnected()) {
                mqttclient.connect(mqttConnectOptions);
            }
        } catch (MqttException e) {
            logger.error("初始化mqttClient配置失败", e);
        }
  • our connet options mqtt.automaticReconnection=false mqtt.cleanSession=true mqtt.connectionTimeout=30 mqtt.keepAliveInterval=60 mqtt.filePersistDir=E:/CYSOFT/iot/mqtt mqtt.reconnInterval=15 mqtt.willTopic= mqtt.willPayload=
  • our logs

2021-02-25 09:50:35.192 [MQTT Ping: iot_9fe06456-413d-4f2e-be29-d8bbcd7f8ff8] ERROR org.eclipse.paho.client.mqttv3.internal.ClientState - iot_9fe06456-413d-4f2e-be29-d8bbcd7f8ff8: Timed out as no activity, keepAlive=60,000 lastOutboundActivity=1,614,217,775,191 lastInboundActivity=1,614,217,723,185 time=1,614,217,835,192 lastPing=1,614,217,775,191 2021-02-25 09:50:50.794 [MQTT Ping: iot_9fe06456-413d-4f2e-be29-d8bbcd7f8ff8] ERROR com.cysoft.iot.core.config.IotMqttClientService - connectionLost org.eclipse.paho.client.mqttv3.MqttException: 等待来自服务器的响应时超时 at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31) ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at org.eclipse.paho.client.mqttv3.internal.ClientState.checkForActivity(ClientState.java:694) ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at org.eclipse.paho.client.mqttv3.internal.ClientComms.checkForActivity(ClientComms.java:784) [org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at org.eclipse.paho.client.mqttv3.internal.ClientComms.checkForActivity(ClientComms.java:770) [org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at org.eclipse.paho.client.mqttv3.TimerPingSender$PingTask.run(TimerPingSender.java:77) [org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at java.util.TimerThread.mainLoop(Unknown Source) [?:1.8.0_172] at java.util.TimerThread.run(Unknown Source) [?:1.8.0_172] 2021-02-25 09:51:20.807 [iot_reconnect_-thread-1] ERROR com.cysoft.iot.core.config.IotMqttClientService - connectionLost retry connect org.eclipse.paho.client.mqttv3.MqttException: 已连接客户机 at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31) ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at org.eclipse.paho.client.mqttv3.MqttAsyncClient.reconnect(MqttAsyncClient.java:1376) ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at org.eclipse.paho.client.mqttv3.MqttClient.reconnect(MqttClient.java:724) ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at com.cysoft.core.mqtt.bean.MqttClientService.reconnect(MqttClientService.java:126) ~[CY-Tools-1.0.0.jar!/:?] at com.cysoft.iot.core.config.IotMqttClientService.lambda$connectionLost$1(IotMqttClientService.java:128) ~[classes!/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_172] at java.lang.Thread.run(Unknown Source) [?:1.8.0_172]

SpringStudent avatar Feb 25 '21 07:02 SpringStudent