paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

Question: Will on_message always get all messages?

Open ErlendFax opened this issue 2 years ago • 1 comments

Our mqtt client receives about a 1000 messages each second.

I made a on_message callback like this:

def on_message(client, userdata, msg):
    message_buffer.append(msg.payload.decode('utf-8'))
    if(len(message_buffer) > 100):
        handleBuffer(message_buffer)
        message_buffer.clear()

The function handleBuffer(...) might take a few seconds which made me think, what happens in the meantime? Will the client always receive messages in the background before its sent to on_message? What's the bottle neck here, CPU?

ErlendFax avatar Apr 20 '22 14:04 ErlendFax

at the rate you are getting the messages, you should not be doing this.

You should only be adding the messages to message_buffer. Dont even bother doing the msg.payload or even the decode

The issue is, you are blocking the return of the on_message function, thereby halting all the heartbeat and handshaking with MQTT Broker, and there is a very good chance that you will end up seeing strange issues.

Take the handling/processing of messages part in another thread, and run the mqtt library in its own thread.

Use Queue (a native python datatype that is threadsafe) where you add the incoming messages to that Queue in the on_message function and then another thread (or even a multitude of threads) to process the incoming messages.

FYI: Increasing CPU frequency may reduce the problems you encounter but it is not a bottleneck issue.

Please close this issue if you are satisfied with the answer.

sfphh4 avatar Apr 28 '22 08:04 sfphh4

We use on_message to shovel measurements from MQTT into a database. I believe we are seeing messages being lost (i.e. unprocessed) on higher publishing rates. In particular, we observed rates north of 5000 messages per second; not a rate I'd consider highly unusual or particularly high.

@sfphh4 Your pointers are much appreciated. Thank you.

I am somewhat surprised by the lack of guarantees made by the on_message callback mechanism. From where I am standing, it is okay if performance somehow suffers (for example, if memory or CPU usage increases dramatically), but if messages are being lost without feedback (e.g. an Exception), this is a fundamental correctness problem of the software.

Even if message rates of 5000 messages per second can be handled by passing messages to a queue in on_message, it is unclear if this approach works correctly for 25000 messages per second or 100000 messages per second.

bantu avatar Apr 01 '23 09:04 bantu

I'm going to close this because I believe the original question was answered (and there was no further feedback from OP).

@bantu my reading of the source is that messages should not be lost under normal operation. However there is an issue with the handling of keepalives under high load (see #328) and, should the connection be dropped, there is a potential for message loss (this depends upon how you are connecting). If you are seeing message loss we would appreciate details (but, unfortunately, this kind of issue is tricky to trace so the more detail you can provide the better). Note that the broker config is important here too; brokers often provide the option to drop messages if the client is not keeping up; see the Mosquitto setting max_inflight_messages for an example).

Note: This is part of an exercise to clean up old issues so that the project can move forwards. Due to the number of issues being worked through mistakes will be made; please feel free to reopen this issue (or comment) if you believe it's been closed in error.

MattBrittan avatar Jan 08 '24 09:01 MattBrittan