paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

Publishing multiple messages simultaneously fail

Open lu-maca opened this issue 2 years ago • 2 comments

Hi,

I'm trying to run a simple publisher-subscriber example but a really weird behaviour happens:

# publisher.py
------------------------------------------
import paho.mqtt.client as mqtt

client = mqtt.Client()
client.connect("localhost",port=1883)

client.publish("mtq40_1", b"\x01")
client.publish("mtq40_2", b"\x01")
client.publish("mtq40_3", b"\x01")
client.publish("mtq40_4", b"\x01")
client.publish("rw250_1", b"\x01")
client.publish("rw250_2", b"\x01")
client.publish("rw250_3", b"\x01") 
client.publish("rw250_4", b"\x01")  

# subscriber.py
------------------------------------------
import paho.mqtt.client as mqtt

client = mqtt.Client()

def _initialize(connection_timeout=100):
    def on_connect(client, userdata, flags, rc):
        # check if the connection receives a CONNACK response from the server
        if rc != 0:
            raise ConnectionRefusedError("mqtt connection refused.")
        
        # subscribe to downlink
        client.subscribe("mtq40_1")
        client.subscribe("mtq40_2")
        client.subscribe("mtq40_3")
        client.subscribe("mtq40_4")
        client.subscribe("rw250_1")
        client.subscribe("rw250_2")
        client.subscribe("rw250_3")
        client.subscribe("rw250_4")
        
    def on_message(client, userdata, msg): 
        # add the sniffed message to the correct queue 
        print(msg.topic, msg.payload)

    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(host="localhost", port=1883)

_initialize()
client.loop_forever()

When I run publisher.py, the weird stuff happens:

# first run
mtq40_1 b'\x01'
mtq40_2 b'\x01'
mtq40_3 b'\x01'
mtq40_4 b'\x01'

# second run
mtq40_1 b'\x01'
mtq40_2 b'\x01'

# third run
mtq40_1 b'\x01'
mtq40_2 b'\x01'
mtq40_3 b'\x01'
mtq40_4 b'\x01'

It seems that the publisher is not sending messages correctly. When a time.sleep(0.05) is added between the publish operations, it works fine:

mtq40_1 b'\x01'
mtq40_2 b'\x01'
mtq40_3 b'\x01'
mtq40_4 b'\x01'
rw250_1 b'\x01'
rw250_2 b'\x01'
rw250_3 b'\x01'
rw250_4 b'\x01'
 
mtq40_1 b'\x01'
mtq40_2 b'\x01'
mtq40_3 b'\x01'
mtq40_4 b'\x01'
rw250_1 b'\x01'
rw250_2 b'\x01'
rw250_3 b'\x01'
rw250_4 b'\x01'

Why does this happens? Am I doing something wrong?

Thanks! Luca

lu-maca avatar Sep 18 '23 15:09 lu-maca

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718099 The MQTT protocol allows discarding messages with a QoS equal to 0。 You can switch to a different MQTT broker or specify the QoS。

EngrealSun avatar Nov 30 '23 09:11 EngrealSun

The publisher client is not running. You never called loop/loop_start or loop_forever. It also seems you never wait for publisher to finish publishing (no disconnect, no even a arbitrary sleep). Call to publish isn't synchronous and don't wait to publish packet to be received by broken (which isn't possible in QoS=0).

Publish() will only submit packet to the TCP socket, and OS might no send them on network immediately. I'm not 100% sure, but I think that OS is allowed to discard not yet send data on socket close (which happen if you terminate the program). That why adding some sleep (a sleep at the end of publisher.py is enough) "solve" your problem. A fix should be to start the client (loop_forever) and disconnect + wait for disconnection before exiting.

A better solution is paho.mqtt.publish (https://github.com/eclipse/paho.mqtt.python#id3) which take care of all needs to send message.

PierreF avatar Jan 07 '24 21:01 PierreF

Closing due to inactivity (and it looks like an answer has been provided).

MattBrittan avatar Jul 17 '24 23:07 MattBrittan