python-can icon indicating copy to clipboard operation
python-can copied to clipboard

Receiving queue from different threads on same channel

Open CanCanRep opened this issue 6 years ago • 6 comments

Hi guys,

I´m always facing errors working on different threads on the same hw channel.

Let me explain: Imagine you have one hw channel e.g Kvaser HW Channel 0. On which you want to work on different threads. Usually one HW channel has only one internal queue. Which means runnning once the function Bus.recv() on thread1 will always remove the received message from the HW queue. The issue now is, that thread2 is not able to receive the same message, because they share the HW queue.

The ThreadSafeBus class can´t solve this issue, because it is only blocking the access to the resource.

Had anyone else the same errors ? If yes, have you found an

Thank you ;)

CanCanRep avatar Oct 24 '19 12:10 CanCanRep

You essentially want the Bus to provide fan-out semantics, which as you have noticed, are not provided by python-can.

However, you can implement the fan-out semantics you desire by having a dedicated Message RX thread, and then either fanning-out directly to your Threads manually (this can trivially be implemented via a queue per thread and looping through and enqueuing to each queue) or via a queuing solution that provides fan-out semantics.

karlding avatar Oct 25 '19 04:10 karlding

Thanks for your fast reply. I was working on a first draft of a proxy-class which implements a basic fanout functionality. If you think this could be a nice new feature of the library, I could participate and further develop / beautify the class

The advantage is that we can generate classes which looks like normal Bus interfaces, but are acutally implementing a fanout rx queue. This means we can pass a instance to different packages and modules and do not care about implemented threading mechanisms. Because all threads can now work completely independent.

Source Code:

class FanOutRxQueuedThreadSafeBus(ThreadSafeBus):

    def __init__(self, fanout: bool = False, *args, **kwargs):
        super(FanOutRxQueuedThreadSafeBus, self).__init__(*args, **kwargs)
        self.args: tuple = args
        self.kwargs: dict = kwargs
        self.rx_listener: BufferedReader = BufferedReader()
        self.notifier: Notifier = None
        if not fanout:
            self.__bus: ThreadSafeBus = ThreadSafeBus(*args, **kwargs)
            self.__start_rx_threaded_notifier()

    def recv(self, timeout=None):
        with self._lock_recv:
            return self.rx_listener.get_message(timeout=timeout)

    def get_new_fanout_instance(self):
        new_fanout_instance = FanOutRxQueuedThreadSafeBus(fanout=True, *self.args, **self.kwargs)
        new_fanout_instance.notifier = self.notifier
        new_fanout_instance.append_listener(t.rx_listener)
        return t

    def append_listener(self, listener):
        self.notifier.add_listener(listener)

    def __start_rx_threaded_notifier(self):
        self.notifier = Notifier(self.__bus, [self.rx_listener], timeout=1.0, loop=None)

Example Usage:

    t = FanOutRxQueuedThreadSafeBus(bustype='kvaser', channel=0)
    t2 = t.get_new_fanout_instance()
    t3 = t.get_new_fanout_instance()
    t4 = t.get_new_fanout_instance()
    t5 = t.get_new_fanout_instance() 

CanCanRep avatar Oct 25 '19 08:10 CanCanRep

The other option is, as your code highlights, to make use of the Notifier instead of applying these semantics on a Bus like you had initially implied.

So my initial impressions regarding implementing this via a Bus vs just having each thread register themselves to the Notifier, I suppose the main advantages here are that each thread can call send on the Bus, and that it allows when each Thread wants to receive messages. This would be accomplished right now by registering themselves to the Notifier (for receiving messages) and switching to a ThreadSafeBus and keeping a reference to it on each thread (if they need to send). However, there is no way to control when each thread receives a message, so they would need to buffer there (which is accomplished via a BufferedReader now, allowing the application to control when messages are processed, and I believe this is what people care about).

I don't think this new Bus type is strictly necessary, but perhaps some other maintainers can chime in.

karlding avatar Oct 25 '19 15:10 karlding

Hey - I have recently forked and been working on implementing support for a new adapter and ran into this same issue in my use case. To put it simply, I want to run a logger but then also manually grab a message through bus.recv(). However, this is not reliable because when I add a Logger as a listener in the Notifier, it gets consumes but whichever call gets there first.

My question is - has there been any additional thoughts from the current maintainers on support for something like this?

I have not attempted a solution yet but my fork and recent work is here on develop.

stupidlogic avatar Aug 19 '21 14:08 stupidlogic

@stupidlogic Do you have any pointers for a solution to the issue you have mentioned above.. The above scenario seems to be a replica of what you have mentioned. I have initiated a can logger and registered it as listener for a CAN notifier. However there are few scenarios where we have to iterate over messages in bus as well to check for specific messages based on some injection event. With can logging inactive, the messages are received in the manual process. But when CAN notifier is active, the messages are not received in the other method.

akasonu avatar Feb 11 '22 19:02 akasonu

@akasonu I actually have a partially completed solution on a local branch of my fork but it needs to be rebased with the latest update here and validated. It also doesn't yet maintain original functionality (from the branch it was based on). This is on my list to finish shortly(ish).

My strategy was to keep track of the last received message for each arbitration ID. Then determine whether or not a logger or the bus.recv() function was grabbing a message. If bus.recv() was, return the last received message - I also wanted to be able to grab the last received message for a specific arbitration ID and so I added an optional arbitration ID keyword to the bus.recv() function. The logger (notifier) is intended to function exactly as it did before. This has only been sparsely validated as this point.

Any input welcome.

My current changes on this branch: https://github.com/stupidlogic/python-can/tree/bus_recv_logger_updates

Edit: I just rebased with 4.0.0 and force pushed the above branch. Untested, but the rebase was straightforward.

stupidlogic avatar Feb 22 '22 18:02 stupidlogic