paho.mqtt.python icon indicating copy to clipboard operation
paho.mqtt.python copied to clipboard

Deadlock when publishing from multiple threads.

Open blockbomb opened this issue 6 years ago • 10 comments

There seems to be an issue that is causing my codebase a deadlock that seem related to #168 and #235. when using version 1.4.0 of the project.

I have a multi-threaded application that performs the client.loop_forever(retry_first_connection=True) at the end of my main thread. This performs all the event handling for the MQTT layer. On the underside of that I have another thread that reads messages that come from a USB device.

When I receive an MQTT message my handler issues commands to the USB Device and await their successful completion before exiting the handler (one mqtt message may require multiple command/response pairs on the USB comm). There are other asynchronous messages that may come up from the USB device from time to time and I publish those on the broker when they arrive.

I use not returning from the callback as a way to throttle my consumption of the next message on the broker to alleviate my need for queuing messages in the application however in version (1.4.0) if I receive one of the asynchronous messages from the USB device it attempts to publish from another thread which ends up calling

client.publish self._send_publish self._packet_queue self._call_socket_register_write

which of course waits on the self._callback_mutex that is already held by the first thread.

A few questions.

  1. in #168 it was mentioned that the changes would prohibit two callbacks being called at the same time is it undesirable to have two callbacks called at the same time? In my case I suppose I was relying on it being possible.

  2. when 1. was mentioned above the enhancements for external loop control were not yet added to the codebase so maybe we need to be more careful about when we issue the callback for socket register write.

  3. I am attempting to use the project in the wrong manner? in which case how would I stop the broker from publishing to my client while I wait for the USB device communications to come back with a response to the initial request while still being able to publish to the broker but without maintaining an incoming queue of my own in the application. I would assume that the MQ in MQTT should be handling that aspect without my need to.

3a) I suppose it is possible I could create another client that is associated with the USB device that wouldn't clash with the main client, but it was easier to reason with a unique shared client for the application.

any input would be greatly appreciated, Thank you for all of your hard work I really enjoy this project.

-BB

blockbomb avatar Jan 24 '19 22:01 blockbomb

We also have a deadlock using the mqtt client in 2 different threads with the same version (1.4.0) As a quick (and ugly) workaround we did the following wrapper.

class MQTTClientWrapper(mqtt.Client):

    def __init__(self, name):
        mqtt.Client.__init__(self, name)

    def publish(self, topic, payload=None, qos=0, retain=False):
        with self._callback_mutex:
            mqtt.Client.publish(self, topic, payload, qos, retain)

mehdilauters avatar Feb 22 '19 08:02 mehdilauters

To avoid the publish in the callback in multiple threads, it will cause the deadlock issue.

dw7086 avatar Oct 18 '19 07:10 dw7086

For me this error started to happen in v1.4.0 because of the addition of

self._call_socket_register_write()

at line 2545 in client.py in method _packet_queue. _call_socket_register_write() tries to aquire the self._callback_mutex lock, which it never did in 1.3.1 and subsequently if another thread is receiving a packet at the same time, trying to aquire that same mutex to pass data to a callback, stuff can break.

In my case pahos locks interferes with application level locks because of that the thread calling publish( and thus ends up calling _call_socket_register_write() ) waits for self._callback_mutex, that is held by another thread receiving a mqtt packet. That thread in turn waits for a lock outside of paho, that is held by the first thread that called publish.

Its entirely possible to redesign usage of paho to accomodate for this behaviour, but in my opinion it is kind of weird to add callback logic solely to integrate with external event loops, that gets called by default.

Hopefully this can help someone with a similar issue.

corroleaus avatar Oct 21 '19 10:10 corroleaus

A simple fix for above mentioned issue would be to simply not aquire self._callback_mutex_lock in _call_socket_register_write if no callback is defined.

--- a/src/paho/mqtt/client.py
+++ b/src/paho/mqtt/client.py
@@ -2137,8 +2137,8 @@ class Client(object):
         if not self._sock or self._registered_write:
             return
         self._registered_write = True
-        with self._callback_mutex:
-            if self.on_socket_register_write:
+        if self.on_socket_register_write:
+            with self._callback_mutex:
                 try:
                     self.on_socket_register_write(
                         self, self._userdata, self._sock)

corroleaus avatar Oct 22 '19 09:10 corroleaus

I have experience this problem as well. Created a separate thread with an event loop and scheduled publishing using loop.call_soon_threadsafe.

mjcumming avatar Feb 01 '20 21:02 mjcumming

I had the same issue. I have multiple threads that can call client.publish. I solved using a queue to send the messages; so I don't call client.publish, but I've created a utility method to add messages to my queue. In a separated thread I manage the queue to send the messages, in this way I'm sure I do the client.publish only from one thread.

riccardoch avatar Feb 04 '20 08:02 riccardoch

We're having this issue. The library should make it clear that it is not suitable for thread safe use. At the moment it occupies the dangerous space of pretending to be thread safe.

AndrewCarterUK avatar Jun 11 '20 02:06 AndrewCarterUK

If you don't use the on_socket_register_write callbacks, this is a simple performance-neutral workaround:

#   NetLoop Thread:                                         User code in another thread
#   loop(): receives message
#   loop_read() => _packet_read()
#   _packet_handle() => handle_publish()
#   _handle_on_message(): holds _callback_mutex
#   USER CODE: long-running message handler,                User code tries to publish:
#              gets preempted by another thread             publish(): holds _out_message_mutex
#                                                           _send_publish()
#                                                           _packet_queue()
#                                                           _call_socket_register_write(): blocks on _callback_mutex
#   USER CODE: long-running message handler resumes 
#              and attempts to publish a response
#   publish(): blocks on _out_message_mutex
#              => deadlock

# avoid deadlock by nop-ing socket control callback stubs
import paho.mqtt.client as mqtt
mqtt.Client._call_socket_register_write = lambda _self: None
mqtt.Client._call_socket_unregister_write = lambda _self, _sock=None: None

Ultimately, the global _callback_mutex should either be removed or made more granular. According to https://github.com/eclipse/paho.mqtt.python/pull/168 the reason for holding _callback_mutex everytime a callback is set or executed, is to avoid this scenario:

  1. User in main thread sets on_subscribe to be a function
  2. Client acquires _callback_mutex, and self._on_subscribe is present
  3. Before the function is actually called, user sets on_subscribe to be None
  4. Client tries to call None, raises an exception

If the point is simply to avoid the exception, just take a copy of the callback before making the is None check. What the current system actually does, is turn the operation of setting a callback into: Block until existing callback is finished, then set callback to new function I'd argue that a potential multi-second block is not what the average developer expects in a simple property setter. Even worse, it waits for any running callback, not just the one being set. If there is a need to set a callback in such a way that guarantees, that after returning from the setter, the previous callback has finished executing - I think it would be better to expose a locking mechanism, so a developer can implement this behavior only when needed.

dbeinder avatar Jul 23 '20 15:07 dbeinder

The 1.6.x branch has a lot of improvement in this area if you'd like to give it a try.

ralight avatar Jul 22 '21 01:07 ralight

I had the same issue and it went away after upgrading to 1.6.1.

jimfunk avatar Jun 13 '22 12:06 jimfunk

I'm going to close this due to it's age and the fact that the v1.6 improved things; if anyone is encountering similar issues I'd suggest raising a new issue (trying to get the issue count down to a manageable level!).

MattBrittan avatar Jan 07 '24 23:01 MattBrittan