paho.mqtt.python
paho.mqtt.python copied to clipboard
Loosing qos 2 messages on reconnect after process kill
I'm experiencing an issue with delivery reliability on client reconnects. Messages are published and subscribed using v1.4.0, qos 2, clean_session=false, loop_start(). Now if we have a longer running operation in on_message callback that blocks further processing, and the python process with the client gets killed during that time, messages are skipped when restarting the client. By using clean_session=false in combination with qos 2, I would expect that messages will be delivered even on hard client fails.
Now I think with the recent v1.4 release a behaviour was introduced to rather skip messages than result in hanging of processing.
When starting a Test client subscribing to a topic with enough messages queued to be delivered, it seems that 20 messages are received before actually calling the handler for the first time. After killing and restarting, the messages get PUBCOMed I guess because of the 1.4 change that cant do other than skipping because message not present in buffer anymore.
I tried to work around this by setting max inflight messages to 1 to get a bit more control and not receiving messages that might not be handled (processing only 1 message at a time), but this had no effect at all.
This is the log output, after setting client.max_inflight_messages_set(1):
`('MQTT-TEST-SUB on_log ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=test') MQTT-TEST-SUB Waiting for messages... ('MQTT-TEST-SUB on_log ', 'Received CONNACK (1, 0)') MQTT-TEST-SUB Connected with result code 0 ('MQTT-TEST-SUB on_log ', "Sending SUBSCRIBE (d0, m1) [('TestTopic', 2)]") ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 71)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 71)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d1, q2, r0, m72), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 72)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d1, q2, r0, m73), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 73)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d1, q2, r0, m74), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 74)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m75), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 75)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m76), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 76)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m77), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 77)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m78), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 78)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m79), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 79)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m80), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 80)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m81), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 81)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m82), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 82)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m83), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 83)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m84), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 84)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m85), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 85)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m86), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 86)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m87), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 87)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m88), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 88)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m89), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 89)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m90), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 90)') ('MQTT-TEST-SUB on_log ', 'Received SUBACK') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m91), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 91)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 72)') MQTT-TEST-SUB TestTopic: {"timestamp": "2019-05-07T20:39:33+0200"} ^CMQTT-TEST-SUB Signal 2 received, terminating application... ('MQTT-TEST-SUB on_log ', 'Sending DISCONNECT') Killed
MQTT-TEST-SUB Application Starting ('MQTT-TEST-SUB on_log ', 'Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=test') MQTT-TEST-SUB Waiting for messages... ('MQTT-TEST-SUB on_log ', 'Received CONNACK (1, 0)') MQTT-TEST-SUB Connected with result code 0 ('MQTT-TEST-SUB on_log ', "Sending SUBSCRIBE (d0, m1) [('TestTopic', 2)]") ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 72)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 72)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 73)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 73)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 74)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 74)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 75)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 75)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 76)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 76)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 77)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 77)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 78)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 78)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 79)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 79)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 80)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 80)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 81)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 81)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 82)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 82)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 83)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 83)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 84)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 84)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 85)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 85)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 86)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 86)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 87)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 87)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 88)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 88)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 89)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 89)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 90)') ('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 90)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d1, q2, r0, m91), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 91)') ('MQTT-TEST-SUB on_log ', 'Received SUBACK') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m92), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 92)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m93), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 93)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m94), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 94)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m95), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 95)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m96), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 96)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m97), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 97)') ('MQTT-TEST-SUB on_log ', u"Received PUBLISH (d0, q2, r0, m98), 'TestTopic', ... (87 bytes)") ('MQTT-TEST-SUB on_log ', 'Sending PUBREC (Mid: 98)') ('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 91)') MQTT-TEST-SUB TestTopic: {"timestamp": "2019-05-07T20:56:13+0200"}`
Have you set a client id? By default it is automatically generated, thus the client gets a new ID on every restart (even with clean session set to false). The broker cannot identify the client and will not deliver queued messages. Also make sure that all clients have unique IDs!
In the "Known Limitations" section of the docs it says this:
When clean_session is False, the session is only stored in memory not persisted. This means that when client is restarted (not just reconnected, the object is recreated usually because the program was restarted) the session is lost. This result in possible message lost.
Since your python process died, so did all of the pending qos2 messages not yet ack'd by the broker
You can pretty much see what is happening in the logs
MQTT-TEST-SUB Connected with result code 0
('MQTT-TEST-SUB on_log ', "Sending SUBSCRIBE (d0, m1) [('TestTopic', 2)]")
('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 72)')
('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 72)')
('MQTT-TEST-SUB on_log ', 'Received PUBREL (Mid: 73)')
('MQTT-TEST-SUB on_log ', 'Sending PUBCOMP (Mid: 73)')
So upon connection after the failure PUBREL
messages are being received from the server (relating to messages that were previously acknowledger with a PUBREC
); due to the loss of the store (which is held in memory) the client has no knowledge of these ID's. The client responds as is required by the spec:
In the QoS 2 delivery protocol, the sender MUST send a PUBREL packet when it receives a PUBREC packet from the receiver with a Reason Code value less than 0x80. This PUBREL packet MUST contain the same Packet Identifier as the original PUBLISH packet.
One potential option might be to use manual_ack
and only acknowledge the message when you have completed processing (however I think manual_ack
functionality may be broken with QOS2 based on a quick look at the code).
As this is due to a known limitation and the issue is pretty old I'm going to close it. Please feel free to reopen (or open a new issue) if you have further thoughts.