wrapyfi icon indicating copy to clipboard operation
wrapyfi copied to clipboard

Multi-threading Issues with NativeObject Publisher in ZeroMQ Mode

Open zhaokefei opened this issue 1 year ago • 5 comments

When using the NativeObject publisher pattern in ZeroMQ, calling publish from multiple threads leads to message format corruption when another process is listening. Upon investigation, I found that ZeroMQ sockets are not thread-safe by default and that each thread should create its own socket. Is there support for this approach to ensure thread safety when publishing messages in a multi-threaded environment? The current behavior suggests that when multiple threads use the same publisher socket to send messages, the data received by the listener is mangled and inconsistent.

Would it be possible to introduce a thread-safe mechanism or a recommended pattern to allow for safe multi-threaded publishing using ZeroMQ sockets? This would greatly enhance the reliability of systems where multi-threaded publishing is necessary.

zhaokefei avatar Mar 15 '24 05:03 zhaokefei

Thanks for discovering this issue. Could you provide a minimal example for me to reproduce the problem?

fabawi avatar Mar 15 '24 11:03 fabawi

Hello, this is the minimal reproducible code from my local test run. I've preliminarily determined that the issue is caused by the send_multipart method of zeromq sockets being non-thread-safe. It seems necessary to initialize a zeromq socket in each thread. How can this be implemented within the existing framework?

run command

# run publish
python3 publish.py

# run listen
python3 listen.py

error message

When it needs to run for a while, it's prone to being triggered when the data being transferred is large.

## listen.py

Traceback (most recent call last):
  File "listen.py", line 42, in <module>
    main()
  File "listen.py", line 35, in main
    (msg,) = forward.receiver_chain_B()
  File "/app/wrapyfi/wrapyfi/connect/wrapper.py", line 642, in wrapper
    return cls.__trigger_listen(func, instance_id, kwd, *wds, **kwds)
  File "/app/wrapyfi/wrapyfi/connect/wrapper.py", line 260, in __trigger_listen
    returns.append(wrp_exec.listen())
  File "/app/wrapyfi/wrapyfi/listeners/zeromq.py", line 222, in listen
    return json.loads(
  File "/app/miniconda3/envs/bev/lib/python3.8/json/__init__.py", line 370, in loads
    return cls(**kw).decode(s)
  File "/app/miniconda3/envs/bev/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/app/miniconda3/envs/bev/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
INFO:root:Deinitializing ZeroMQ middleware

Debug: When I check the return of obj in the listen method of ZeroMQNativeObjectListener, the expected format should be [<topic>, <msg>]. However, when I debug, I find outputs like [<topic>, <topic>, <msg>].

code

## publish.py

## publish.py

import time
import numpy as np
from threading import Thread

from wrapyfi.connect.wrapper import MiddlewareCommunicator


class PublishCls(MiddlewareCommunicator):

    def mock_msg(self):
        """
        Read and forward message from chain A.
        """
        image_array = np.zeros((640, 640, 3))
        return ({'raw': image_array},)

    @MiddlewareCommunicator.register(
        "NativeObject",
        "zeromq",
        "PublishCls",
        "/example/native_chain_B_msg",
        carrier="tcp",
        should_wait=False,
    )
    def publish_chain_B(self, msg):
        """
        Read and forward message from chain B.
        """
        return (msg,)



def main():
    """
    Main function to initiate PublishCls class and communication.
    """
    forward = PublishCls()
    forward.activate_communication(forward.publish_chain_B, mode='publish')

    _queue = list()

    def async_sender():
        while True:
            if _queue:
                msg = _queue.pop()
                forward.publish_chain_B(msg)
            time.sleep(0.1)

    Thread(target=async_sender).start()
    Thread(target=async_sender).start()
    Thread(target=async_sender).start()


    while True:
        (msg_object,) = forward.mock_msg()
        if msg_object is not None:
            _queue.append(msg_object)
        time.sleep(0.1)


if __name__ == "__main__":
    main()


## listen.py


from wrapyfi.connect.wrapper import MiddlewareCommunicator


class ReceiverCls(MiddlewareCommunicator):

    @MiddlewareCommunicator.register(
        "NativeObject",
        "zeromq",
        "ReceiverCls",
        "/example/native_chain_B_msg",
        carrier="tcp",
        should_wait=False,
    )
    def receiver_chain_B(self, msg=""):
        """
        Read and forward message from chain B.
        """
        return (msg,)



def main():
    """
    Main function to initiate ReceiverCls class and communication.
    """
    forward = ReceiverCls()
    forward.activate_communication(forward.receiver_chain_B, mode='listen')

    while True:
        (msg,) = forward.receiver_chain_B()
        if msg is None:
            continue
        print(msg['raw'].shape)


if __name__ == "__main__":
    main()

zhaokefei avatar Mar 20 '24 09:03 zhaokefei

This is a bit tricky. The problem is that the establishment of connections is dependent on class instances. There is no way for Wrapyfi to know that a new socket must be established when a new thread is created. I'm not sure what the best way to handle this would be, but I'm thinking maybe we could: A) Check the thread ID on every call to a method, and should there be a new ID, we establish a new instance of the class and attach it to the thread. This will certainly be computationally costly and is not ideal by any means. B) Change the structure of Wrapyfi such that the establishment of connections is detached from the class instance, meaning that the method could connect to new nodes after the initial entry. This would make Wrapyfi even more dynamic and would enable switching topics on the fly, but we'd have to introduce significant modifications to Wrapyfi's connectivity mechanisms. If more users find this feature necessary, we will dedicate time to introduce it, otherwise, it may break other useful features at the cost of introducing a feature that may only be necessary for limited use cases: Cost >>> Benifit

fabawi avatar Mar 20 '24 11:03 fabawi

@zhaokefei I wrapped up a quick solution for ZeroMQ to the problem where it relies on local threading storage. Check out the dev branch bf06ce059643d675df6193ca4abfdeda75dc8a4c . Given your example, you just need to pass the multi_threaded=True argument to the publisher:

    @MiddlewareCommunicator.register(
        "NativeObject",
        "zeromq",
        "PublishCls",
        "/example/native_chain_B_msg",
        carrier="tcp",
        should_wait=False,
        multi_threaded=True
    )
    def publish_chain_B(self, msg):
        """
        Read and forward message from chain B.
        """
        return (msg,)

If this solves your problem, it would be great if you could issue a PR adding your snippet to the examples directory so others would benefit from it as well.

fabawi avatar Mar 20 '24 18:03 fabawi

@fabawi It's really great, after I pass the test, I will provide a PR with examples, thank you for your quick response.

zhaokefei avatar Mar 21 '24 02:03 zhaokefei