paho.mqtt.python
paho.mqtt.python copied to clipboard
keepalive does NOT work when on_message is much frequently called(on pressure)
Summary:
set keepalive=8. in on_message, sleep 2 seconds. publish message frequently, then RECONNECTION occurs. that's NOT expected. It seems that if always has message to work on, the PING(heartbeat) will NEVER be sent. the MQTT broker server is EMQ(emqx-3.0-beta.1).
subscribe with QoS0 indicate on_message will not send any ACK, right?
code:
import paho.mqtt.client as mqtt
import time
import datetime
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("hello")
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
print(datetime.datetime.now())
time.sleep(2)
client = mqtt.Client()
client.will_set("die", "die")
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 8)
client.loop_forever()
and run it.
open another terminal, type:
mosquitto_pub -t 'hello' -m '111' -r
# run them quickly
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
mosquitto_pub -t 'hello' -m '222'
...
the python console output:
$python3 main.py
Connected with result code 0
hello b'111'
2018-09-04 13:36:07.738074
hello b'222'
2018-09-04 13:36:13.008156
hello b'222'
2018-09-04 13:36:15.011262
hello b'222'
2018-09-04 13:36:17.014952
hello b'222'
2018-09-04 13:36:19.018574
hello b'222'
2018-09-04 13:36:21.021358
hello b'222'
2018-09-04 13:36:23.024580
#----> note this line, it's reconnected <-----
Connected with result code 0
hello b'111'
2018-09-04 13:36:26.031771
and when RECONNECTION occurs, we can see the last will die
in the same time.
First of all, this library can do nothing if your on_message callback took more than half of keepalive. In such case you may always miss the sent of PING hearbeat (but using a 2 seconds sleep with 8 seconds keepalive should not reach this point).
I'll look if the "priority" is not handled well enough in the library, since in your example, the library should have a moment to send the ping every 2 seconds.
In meantime, workaround could be to increase keepalive: do you need that very short keepalive or is it to more easily reproduce an issue that happen in real situation ?
Thanks.
PING will NEVER happen while messages are accumulated, in that case, if accumulating time is larger than keepalive time, the same error will occur.
It looks like "priority" issue.
Another way to reproduce this problem: Call client.publish()
100,000 times or so, so that the broker has to send you a constant stream of pubacks. Then call client.loop(timeout=N)
, where N
is some small number. You have a good chance of client.loop()
running for much longer than N
seconds, without sending keepalives.
This seems to be an issue where the first loop call in loop_forever() triggers _check_keepalive() in client.py and sets the ping_t. On the second iteration of loop_forever() calling loop (which may be be seconds after the previous call) if the ping response has not been returned then the socket is closed. Would it make more sense that, instead of closing the socket if ping_t > 0, we allow for some tolerance so the broker can return the pingresp? I would suggest that since the keep alive time is triggered after 1xkeep_alive and the broker waits 1.5xkeep_alive before closing the connection that we allow ping_t a tolerance of 0.5*keep_alive before closing the socket.
If this is a reasonable solution could I pick this up?
_check_keepalive()
if self._state == mqtt_cs_connected and self._ping_t == 0:
try:
self._send_pingreq()
except Exception:
self._sock_close()
self._do_on_disconnect(MQTT_ERR_CONN_LOST)
else:
with self._msgtime_mutex:
self._last_msg_out = now
self._last_msg_in = now
else:
self._sock_close()
if self._state == mqtt_cs_disconnecting:
rc = MQTT_ERR_SUCCESS
else:
rc = MQTT_ERR_KEEPALIVE
self._do_on_disconnect(rc)
The ping_t == 0 is what I think needs to be changed and then remove the duplicate check in that takes place after the _check_keepalive call in loop_misc:
if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
# client->ping_t != 0 means we are waiting for a pingresp.
# This hasn't happened in the keepalive time so we should disconnect.
self._sock_close()
if self._state == mqtt_cs_disconnecting:
rc = MQTT_ERR_SUCCESS
else:
rc = MQTT_ERR_KEEPALIVE
self._do_on_disconnect(rc)
return MQTT_ERR_CONN_LOST
I notice this issue is still prominently visible also on the version 2.x. The only way I figured out is to ignore the keep alive checks and don't disconnect.