paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

keepalive does NOT work when on_message is much frequently called(on pressure)

Open liusong1111 opened this issue 6 years ago • 6 comments

Summary:

set keepalive=8. in on_message, sleep 2 seconds. publish message frequently, then RECONNECTION occurs. that's NOT expected. It seems that if always has message to work on, the PING(heartbeat) will NEVER be sent. the MQTT broker server is EMQ(emqx-3.0-beta.1).

subscribe with QoS0 indicate on_message will not send any ACK, right?

code:

import paho.mqtt.client as mqtt
import time
import datetime

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    client.subscribe("hello")

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    print(datetime.datetime.now())
    time.sleep(2)

client = mqtt.Client()
client.will_set("die", "die")
client.on_connect = on_connect
client.on_message = on_message

client.connect("localhost", 1883, 8)

client.loop_forever()

and run it.

open another terminal, type:

mosquitto_pub -t 'hello' -m '111' -r

# run them quickly
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
...

the python console output:

$python3 main.py
Connected with result code 0
hello b'111'
2018-09-04 13:36:07.738074
hello b'222'
2018-09-04 13:36:13.008156
hello b'222'
2018-09-04 13:36:15.011262
hello b'222'
2018-09-04 13:36:17.014952
hello b'222'
2018-09-04 13:36:19.018574
hello b'222'
2018-09-04 13:36:21.021358
hello b'222'
2018-09-04 13:36:23.024580
#----> note this line, it's reconnected <-----
Connected with result code 0
hello b'111'
2018-09-04 13:36:26.031771

and when RECONNECTION occurs, we can see the last will die in the same time.

liusong1111 avatar Sep 04 '18 05:09 liusong1111

First of all, this library can do nothing if your on_message callback took more than half of keepalive. In such case you may always miss the sent of PING hearbeat (but using a 2 seconds sleep with 8 seconds keepalive should not reach this point).

I'll look if the "priority" is not handled well enough in the library, since in your example, the library should have a moment to send the ping every 2 seconds.

In meantime, workaround could be to increase keepalive: do you need that very short keepalive or is it to more easily reproduce an issue that happen in real situation ?

PierreF avatar Sep 13 '18 08:09 PierreF

Thanks.

PING will NEVER happen while messages are accumulated, in that case, if accumulating time is larger than keepalive time, the same error will occur.

It looks like "priority" issue.

liusong1111 avatar Sep 17 '18 03:09 liusong1111

Another way to reproduce this problem: Call client.publish() 100,000 times or so, so that the broker has to send you a constant stream of pubacks. Then call client.loop(timeout=N), where N is some small number. You have a good chance of client.loop() running for much longer than N seconds, without sending keepalives.

j3pic avatar May 13 '19 21:05 j3pic

This seems to be an issue where the first loop call in loop_forever() triggers _check_keepalive() in client.py and sets the ping_t. On the second iteration of loop_forever() calling loop (which may be be seconds after the previous call) if the ping response has not been returned then the socket is closed. Would it make more sense that, instead of closing the socket if ping_t > 0, we allow for some tolerance so the broker can return the pingresp? I would suggest that since the keep alive time is triggered after 1xkeep_alive and the broker waits 1.5xkeep_alive before closing the connection that we allow ping_t a tolerance of 0.5*keep_alive before closing the socket.

If this is a reasonable solution could I pick this up?

_check_keepalive()

if self._state == mqtt_cs_connected and self._ping_t == 0:
                try:
                    self._send_pingreq()
                except Exception:
                    self._sock_close()
                    self._do_on_disconnect(MQTT_ERR_CONN_LOST)
                else:
                    with self._msgtime_mutex:
                        self._last_msg_out = now
                        self._last_msg_in = now
            else:
                self._sock_close()

                if self._state == mqtt_cs_disconnecting:
                    rc = MQTT_ERR_SUCCESS
                else:
                    rc = MQTT_ERR_KEEPALIVE

                self._do_on_disconnect(rc)

The ping_t == 0 is what I think needs to be changed and then remove the duplicate check in that takes place after the _check_keepalive call in loop_misc:

        if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
            # client->ping_t != 0 means we are waiting for a pingresp.
            # This hasn't happened in the keepalive time so we should disconnect.
            self._sock_close()

            if self._state == mqtt_cs_disconnecting:
                rc = MQTT_ERR_SUCCESS
            else:
                rc = MQTT_ERR_KEEPALIVE

            self._do_on_disconnect(rc)

            return MQTT_ERR_CONN_LOST

atmask avatar Mar 17 '22 22:03 atmask

I notice this issue is still prominently visible also on the version 2.x. The only way I figured out is to ignore the keep alive checks and don't disconnect.

skinkie avatar Feb 17 '24 18:02 skinkie