paho.mqtt.python
paho.mqtt.python copied to clipboard
Failing `on_message` retries message indefinitly
Hi!
I was trying to understand the behaviour of message processing if on_message fails with respect to package acknowledgement. If on_message fails, then the message is not acknowledged and I was pondering how this will affect the client. What I found surprised me.
If on_message(...) fails, then the message is kept in the client and retried (by the client) as soon as an other message is received. If it fails again, then the message is retried indefinitly and the client fails to process any new messages. Eventually, this causes a connection timeout. Even after the timeout, the message is retried indefinitly.
I was expecting that if on_messsage(...) fails, then the message is dropped.
Furthermore, I was expecting that if on_message(...) fails on QoS 1 and 2 messages, then the message should still be acknowledged as it was passed from the client to the software. The client might fail to send an acknowledgment due to external reasons, e. g., network issues.
So, what is the reason behind this behaviour?
Note that I was expecting the behaviour similar to suppress_exceptions = True only that the expceptions are passed through.
How to reproduce
Start your mqtt broker. You can use the docker compose and mosquitto configuration below.
Start the script below.
Send a message to t/qos_0 and t/qos_1, respectivly: mosquitto_pub -t qos=0 -m "msg_1" -q 0 -V mqttv5
The client below reuses the mqtt session. So if a QoS 1 message fails to process and the client restarts, the message is retried and the client is thus stuck in a loop immediatly.
import paho.mqtt.client as mqtt
client = mqtt.Client(protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id="my-client-id")
properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
properties.SessionExpiryInterval = 0xFFFFFFFF
client.connect(host="localhost", clean_start=False, properties=properties)
@client.message_callback()
def on_message(client, userdata, msg: mqtt.MQTTMessage):
raise Exception("Error: Processing failed")
@client.connect_callback()
def on_connect(client, userdata, flags, reason_code, properties):
client.subscribe("t/qos_0", qos=0)
client.subscribe("t/qos_1", qos=1)
while True:
try:
client.loop_forever()
except Exception as e:
print(e)
docker-compose.yml
version: "3.7"
services:
mosquitto:
image: eclipse-mosquitto
hostname: mosquitto
container_name: mosquitto
restart: unless-stopped
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto:/etc/mosquitto
- ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf
mosquitto.conf
persistence false
allow_anonymous true
connection_messages true
log_type all
listener 1883
Environment
- Python version: 3.11.9
- Library version: 2.1.0
- Operating system (including version): Windows 11, Ubuntu 24.10
- MQTT server (name, version, configuration, hosting details): mosquitto 2.0.20, eclipse-docker image
Hi,
the retry behind done by the client is the part that surprise me. The message being retry seems the wanted behaviour.
Use-case (which I've already implemented): you are stopping a consumer that must not lost messages. During shutdown, an output connector is already stopped (possibly the shutdown is due to this output connector being already stopped, like you lost a database access, become unhealthy and kill yourself). You need to tell the broker that you do NOT ack the message (raise an exception in on_message), so the broker will retry this message on future client.
Agree it means (and IMO it's the design of the protocol) that if a client always fail on a specific message (and therefore don't ack the message), then the broker will re-sent this message forever.
Note: for QoS 2, this behaviour is tricky (and probably need user decision): does re-sending the message violate the deliver only-once or not ? It probably depend on why user's on_publish callback raised an error (e.g. if it's connection refused to by DB I want a retry, if it's timeout on my DB request it might depend).
But clearly:
- I got surprised that it's the client library that does the retry and not the broker. It might be due to missing "unack" feature of MQTT (if the MQTT client don't unack a message that failed to be processed, the retry won't occur before... I think client disconnection. And since this message is added to pending message, you might reach max_inflight messages and stop receiving new messages)
suppress_exceptions = Trueshould fix you use-case, since it mostly catch all exception and just log them.
Maybe a better option that suppress_exceptions (which mostly make error close to invisible, which was why #365 changed the behaviour), an option (enabled by default ?) that cause message to always be ack even in case of error would help. If you care a lot about not losing a message, you would could disable this option / use manual ack. But for other usage, this option would still ack the message even in case of error which would guarantee that you continue to process future message.
Hi Pierre
Thank you for your answer - really appreciated :)
I think I got a better understanding on this: if an error occurs, then I should reconnect to recover. This is more in the spirit of the specs.
I attached an improved example and you can see that a reconnect() improved the situation.
What happens with a reconnect
QoS = 0 and QoS = 1 works as expected where on the latter messages are retried until the call succeeds.
For QoS = 2, it is the same as QoS = 0 and we get the "issue" #883 i posted earlier. As you mentioned above, this raises the question what exactly once mean.
As far as I can tell, there is no user decision: we can not use the broker's re-transmit for QoS = 2 if on_message fails. Even when we want to use manual ack: Manual ack on QoS = 2 issues a PUBCOMP and not a PUBREC.
Idea / Suggestion
After giving a thought I suggest that the client should issue a disconnect on a failure on on_message. Then the client can resolve the issue and as soon as the client has recovered, the user can reconnect and continue where it left before the error occurred. I claim that failing on on_message is not taking ownership.
Taking your example above: the a db connection fails then I also want to fail taking ownership of a QoS = 2. If a db connection times out and I want to take ownership, then can catch the timeout error and carry on.
I strongly believe that this approach is more consistent, aligns with the specification and gives the user a clearer approach on how to handle problems.
Long story short:
- Do not take ownership for
QoS = 2messages ifon_messagefails. So that the logic is the same as forQoS = 1. - Issue a
DISCONNECTifon_messagefails and let the user deal with the issue.
Code
from itertools import count
from paho.mqtt.publish import single
from threading import Thread
import paho.mqtt.client as mqtt
import sys
import time
import uuid
properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
properties.SessionExpiryInterval = 120
topic = str(uuid.uuid4())
client_id = str(uuid.uuid4())
host = "localhost"
try:
qos = int(sys.argv[1])
except:
qos = 0
client = mqtt.Client(
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=client_id,
userdata = 0
)
client.connect(host=host, properties=properties)
client.enable_logger()
@client.message_callback()
def on_message(client, userdata, msg: mqtt.MQTTMessage):
if userdata < 5:
client.user_data_set(userdata + 1)
raise Exception(f"Error: Processing failed, topic = {msg.topic}, qos = {msg.qos}, payload = {msg.payload}")
print(f"Processing topic = {msg.topic}, qos = {msg.qos}, payload = {msg.payload}")
@client.connect_callback()
def on_connect(client, userdata, flags, reason_code, properties):
if not flags.session_present:
client.subscribe(topic, qos=qos)
def publish():
for n in count():
time.sleep(1)
single(topic=topic, payload=f"{n}", qos=qos, hostname=host)
Thread(target=publish).start()
while True:
try:
client.loop_forever()
except Exception as e:
time.sleep(1)
# disconnect so that we can clear the situation...
# This might be done by the client.
client.disconnect()
client.loop_misc()
# clear situation
# reconnect
client.reconnect()
Example QoS = 0
- Publish is not retried.
Caught exception in on_message: Error: Processing failed, topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'0'
Caught exception in on_message: Error: Processing failed, topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'1'
Caught exception in on_message: Error: Processing failed, topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'2'
Caught exception in on_message: Error: Processing failed, topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'3'
Caught exception in on_message: Error: Processing failed, topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'4'
Processing topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'5'
Processing topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'6'
Processing topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'7'
Processing topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'8'
Processing topic = 43fa4722-17d9-45fd-9a9b-65db60f7441c, qos = 0, payload = b'9'
Example QoS = 1
- Publish is retried.
Caught exception in on_message: Error: Processing failed, topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'0'
Caught exception in on_message: Error: Processing failed, topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'0'
Caught exception in on_message: Error: Processing failed, topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'0'
Caught exception in on_message: Error: Processing failed, topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'0'
Processing topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'0'
Processing topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'1'
Processing topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'2'
Processing topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'3'
Processing topic = 0f31c724-6d43-4b81-b625-b17b699ad646, qos = 1, payload = b'4'
Example QoS = 2
- Publish is not retried
Caught exception in on_message: Error: Processing failed, topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'1'
Caught exception in on_message: Error: Processing failed, topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'2'
Caught exception in on_message: Error: Processing failed, topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'3'
Caught exception in on_message: Error: Processing failed, topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'4'
Processing topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'5'
Processing topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'6'
Processing topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'7'
Processing topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'8'
Processing topic = 4184d143-74a0-469e-8a4c-49901f70e6c8, qos = 2, payload = b'9'