paho.mqtt.python
paho.mqtt.python copied to clipboard
problem with sending MQTT PUBLISH when using multiprocessing
Hi! In my code I am able to send Publish from the main thread, but when I use multiprocessing, the logger shows that the message was published, but the broker doesn't get any message. The publish result code is zero.
Is it wanted behaviour? Or I am doing something wrong?
i am having same issue, workaround was to create a new client before every publish, less than ideal
at very least we need an error code, this was painful to debug
I'm running into this issue today, this affects almost all of the apps, that do not initialize a connection before every publish, apps deployed on gunicorn, etc. Libraries such as the SDKs for AWS.
Just got the same issue here... I lost so many hours, now every thread is creating a new client
I have spent quite some hours into this problem as well.
It seems like calling publish() is not 100% thread safe. Our problem was that we called publish() from on_message callbacks and also from other threads outside.
If these publish() calls overlap both calls can hang while trying to acquire a lock. The outer thread is usually trying to acquire the _callback_mutex and one from the callback is stuck on the _out_message_mutex.
Since it was hard for me to move the publish() call out of the callback I am now creating two clients. One receives the messages in the callback and publishes messages through the other client.
I am experiencing the same issue and I have found the same workaround as @schneeemensch (explained here https://github.com/eclipse/paho.mqtt.python/issues/527)
Thank you to be part of my comprehension and solution :)
This is definitely still an issue. My workaround has been to put what I wish to publish in to a multiprocessing.Queqe()
and publish it from outside the process.
Yup, same issue I tripped over today! Publish fails unless main thread calls it for me as well.
@dachshund-digital Could you try with the current 1.6.x
branch? There have been a log of changes related to threading there.
Yes, I will try... sorry for the late reply... did not happen to see this request until now. Will report back in a few days when I can get the testing done.
I tried to git clone... but only got version 1.5.1? Not very familiar with git, so how do I pull a specific branch? I navigated to the 1.6.1 branch, get the code link, and it says 1.6.x page, but when I do the actual git clone with the so-called 1.6.x link, it pulls down 1.5.1 again, when I install, tells me 1.5.1 is installed already.
Ok, I think this is the right way to clone a specific branch... git clone -b 1.6.x https://github.com/eclipse/paho.mqtt.python.git
Is the above right? Appears to have worked. It pulled some files, but when I run the python install, I still end up with 1.5.1
So I tried... git clone -b 1.6.x --single-branch https://github.com/eclipse/paho.mqtt.python.git
This how show 1.6.x branch as tracked... But this still shows 1.5.1 when I run the python install setup script. So, not sure how to get 1.6.x correctly, or if I got it correctly why the setup script reports 1.5.1 still?
My local believes it is 1.6.x... git checkout 1.6.x Already on '1.6.x' Your branch is up to date with 'origin/1.6.x'.
But... python3 setup,py install in the resulting directory, reports 1.5.1 installed.
Installed /usr/local/lib/python3.7/dist-packages/paho_mqtt-1.5.1-py3.7.egg
Processing dependencies for paho-mqtt==1.5.1
Finished processing dependencies for paho-mqtt==1.5.1
@dachshund-digital you were doing everything right, but the version number hadn't been updated in the in-development branch. Sorry for the confusion. I actually did that yesterday by chance, but hadn't pushed it. If you now do git checkout 1.6.x ; git pull
you should have the correct code and be certain about it.
LOL! What the definition of insanity... Doing the same thing again, and expecting a different result... I was starting to think I was crazy! OK, so next couple of days should be able to get some testing done. Will report back then.
As quick test, I setup multiprocessing processes for the subscribe queue dequeue, and publish dequeue in my project, which handle all the subscribe messages received by the client, and then publishes any results via the client. These two routines work fine when invoked by the main thread. But using separate processes to service the same multiprocessing queues via processes, the subscribe logic works fine. But every publish attempted by the client when it receives a publish request from the dedicated process, returns result code 4. This is the same behavior with 1.5.x now on 1.6.x.
Since my project is quite complex, I am going to create a simple script and test again. If that also fails, will publish here applicable details.
A bit more information... my code does not generate any exceptions or errors, no trace back, etc. The publish request is accepted, the topic and payload are valid, identical to testing with main thread logic, but as noted I always get result code 4, and broker nor and subscribers (say Node Red) receive anything.
Thanks, a small(ish) example would be a big help.
Yeah, working on it now... but I still have something odd... I queried the version... and seems my script is still using 1.5.0, see issue https://github.com/eclipse/paho.mqtt.python/issues/596. So I was not really testing 1.6.x as expected in my comment above.
Ok, got virtual environment setup, clean copy of Pi OS as well, just because I could. Updated python to 3.9.6 over Pi OS default of 3.7.3. Did explicit git clone for branch 1.6.x. When I run my test script I get the following on the attempt to publish:
python3.9 __Test.py Python Version 3.9.6 Paho MQTT Module Version 1.6.0 OnSubscriber Alive Client <paho.mqtt.client.Client object at 0xb64d7ca0>, Is Connect? False Wait OnPublisher Alive Connect, Client <paho.mqtt.client.Client object at 0xb64d7ca0>, Is Connect? True, Data 'None', Flags '{'session present': 0}', Result '0' Subscribe, Is Subscribe? True, Topic 'command/test', Id 1, Result 0 Subscribe, Client '<paho.mqtt.client.Client object at 0xb64d7ca0>', Data 'None', Id '1', Quality '(0,)' Client <paho.mqtt.client.Client object at 0xb64d7ca0>, Is Connect? True Subscriber, Message '{'topic': 'command/test', 'payload': '1629644326960'}', Client '<paho.mqtt.client.Client object at 0xb64d7ca0>', Topic 'command/test', Payload '1629644326960', Queue 0, Accumulator 1 Test, Client <paho.mqtt.client.Client object at 0xb64d7ca0>, Instance 1, Topic 'command/test', Payload '1629644326960' Publisher, Message '{'topic': 'command/test', 'payload': {'Instance': 1, 'Data': '1629644326960'}}', Topic 'status/test', Payload '{'Instance': 1, 'Data': '1629644326960'}', Queue 1, Accumulator 1 Subscriber, Message '{'topic': 'command/test', 'payload': '1629644330582'}', Client '<paho.mqtt.client.Client object at 0xb64d7ca0>', Topic 'command/test', Payload '1629644330582', Queue 0, Accumulator 2 Test, Client <paho.mqtt.client.Client object at 0xb64d7ca0>, Instance 2, Topic 'command/test', Payload '1629644330582' Publish, Result (4, 1) Exception: Message publish failed: The client is not currently connected. (<class 'RuntimeError'>) Traceback (most recent call last): File "/home/pi/noderedlistener/NodeRedListener/__Test.py", line 326, in OnPublisher thePublishOrNot=theResult.is_published() File "/home/pi/noderedlistener/lib/python3.9/site-packages/paho_mqtt-1.6.0-py3.9.egg/paho/mqtt/client.py", line 360, in is_published raise RuntimeError('Message publish failed: %s' % (error_string(self.rc))) RuntimeError: Message publish failed: The client is not currently connected. OnPublisher Not Alive
Tne 'OnPublisher' is the multiprocessing process that attempts the actual publish. The OnSubscriber does the processing of messages the client receiveds, so my on_message callback enqueues message to a subclass subscribe queue under the client (global) object.
So the test logic is MQ client -> on_message -> SubscribeQueue -> OnSubscriber dequeues message, calls OnTest, and OnTest triggers a publish request. Thus OnTest -> PublishQueue -> OnPublisher -> client publish. At which point result is 4, which under 1.6 provides a more descriptive message, that no connect exists.
The actual client object is global, the connection is made via the main thread, and OnSubscriber has not issue with this as a separate process. But OnPublisher as a separate process, when it calls the client publish method, that method apparently cannot see the active connection.
So, why does OnPublisher process, i.e. client object not see the actual connection? Am I breaking some type namespace or scope boundary? OnPublisher running as method under main thread, works, but not a process?
The OnSubscriber explicitly accesses the client global object, because I am subclassing the separate queues (one for subscribe and one for publish) under the client. Thus my expectation is that OnPublisher can do the same, and it does, it can access the subclassed queue without issue. The failure point is at the actual publish call of the client.
I suspect if I open a (different) separate connection under the OnPublisher process, maybe with client publish.single(), that this connection might be recognized and used, so publish does not fail. Maybe that is the next test. But, I really would like to know why the client publish logic fails to use the global client object connection that does exist.
Attached is my script. __Test.zip
FYI... using publish.single(), works. So my key question stands, why doe the client publish() method fail to see the existing connection established and used by the client for subscribes.
A bit more... I tweaked the code to report more information... such as calling is_connected() just before the attempt to publish, but also, as I pull an subscribe message. Since the subscribe queue handler and the publish queue handler are both separate processes, calls to the client is_publish() method, both report false from the separate processes, but reports true from the main process, where the original connect was established. What is very odd is that the subscribe messages are received and my on_message callback fires, but per the documentation it is not under scope of the main thread. So I am left with the impression that the paho client can't handle, at least the way I have my code designed, cross process scoping. Because the subscribe handler never actually interacts with the client object connection, no issues. But since the publish method relies on the establish connection, it fails.
Python Version 3.9.6 Paho MQTT Module Version 1.6.0 OnSubscriber Alive OnSubcribe Idle OnPublisher Alive OnPublish Idle Wait OnConnect, Client <paho.mqtt.client.Client object at 0xb650cf70>, Is Connect? True, Data 'None', Flags '{'session present': 0}', Result '0' Subscribe, Is Subscribe? True, Topic 'command/test', Id 1, Result 0 OnSubscribe, Client '<paho.mqtt.client.Client object at 0xb650cf70>', Data 'None', Id '1', Quality '(0,)' Client <paho.mqtt.client.Client object at 0xb650cf70>, Is Connect? True Wait, Client <paho.mqtt.client.Client object at 0xb650cf70>, Is Connect? True OnSubscriber, Client <paho.mqtt.client.Client object at 0xb650cf70>, Is Connect? False OnSubscriber, Message '{'topic': 'command/test', 'payload': '1629665379959'}', Client '<paho.mqtt.client.Client object at 0xb650cf70>', Topic 'command/test', Payload '1629665379959', Queue 0, Accumulator 1 OnTest, Client <paho.mqtt.client.Client object at 0xb650cf70>, Is Connect? False, Instance 1, Topic 'command/test', Payload '1629665379959' OnSubcribe Idle OnPublisher, Client <paho.mqtt.client.Client object at 0xb650cf70>, Is Connect? False OnPublisher, Message '{'topic': 'command/test', 'payload': {'Instance': 1, 'Data': '1629665379959'}}', Topic 'status/test', Payload '{'Instance': 1, 'Data': '1629665379959'}', Queue 0, Accumulator 1 OnPublish, Result (4, 1) OnException: Message publish failed: The client is not currently connected. (<class 'RuntimeError'>) Traceback (most recent call last): File "/home/pi/noderedlistener/NodeRedListener/__Test.py", line 355, in OnPublisher thePublishOrNot=theResult.is_published() File "/home/pi/noderedlistener/lib/python3.9/site-packages/paho_mqtt-1.6.0-py3.9.egg/paho/mqtt/client.py", line 360, in is_published raise RuntimeError('Message publish failed: %s' % (error_string(self.rc))) RuntimeError: Message publish failed: The client is not currently connected.
I can't use a lock and value methodology for the client, since it can't be serialized or pickled, because the thread under the client can't be. For the same reason, I can't use a multiprocessing manager object per my understanding. I did try these methods but the client object trips up the serialization or pickling of object, that a thread object can't be pickled.
Seems to keep the subscribe handler as a separate process, and publish handler as a separate process, I have to create connections within the scope of each respective process. Or... I have to use true threads not multiprocessing processes, true threads would exist in the same namespace as the client of course.
Bump? No one?
Weird that this still isn't fixed, I'm trying to publish a message in a Flask GET request and it (almost) never works because these request handlers are obviously also in separate threads. It does not seem like a weird use case to me, but because of this bug, it does not work.
Tagging as an enhancement as its going to take some time to sort this out (if anyone is willing to help please yell out). I can duplicate the issue with the code provided by @dachshund-digital but suspect this is a design issue (e.g. the library uses thread.lock
not multiprocessing.lock
etc) but this is well above my python knowledge!
I doubt sharing a Client() over multiple process is something we can and want to support. The paho Client() can not be made picklable since it contains a socket.
Picklability Ensure that the arguments to the methods of proxies are picklable. -- https://docs.python.org/3/library/multiprocessing.html#programming-guidelines
By design, multiprocessing use multiple process which do not share object between them (unless special multiprocessing stuff is used). That means theClient
can NOT the same between process, from there I don't see how to make this working.
You will have to make you own way to communicate across your processes and have the paho client in a single process.
Comment that talk only about thread are not the same issue at this one, since the issue is sharing between multiple processes. multithreading is supported.