python-can icon indicating copy to clipboard operation
python-can copied to clipboard

Asyncio implementation - listeners and notifiers

Open Reowald opened this issue 3 years ago • 6 comments

This is really more of a general question than a bug or feature request apologies if it is annoying to post this general enquiry here I am happy to delete.

I am using a cm4 to bring in CAN messages and then make them available to a PLC using umodbus. I became interested in the asyncio capabilities. I did some general asyncio research and also looked in to the python-can implementation but due to my own lack of understanding have struggled to implement the python-can tools using the can.AsyncBufferedReader() - listeners and notifiers in the way that works.

Would someone be able to interpret my implementation using the basic asyncio syntax into the python-can asyncio functionality with listeners and notifiers.

I have simplified my code into hopefully something that demonstrates what I trying to achieve. I want to request a load of data and catch it and deal with it as it becomes available.


import asyncio
import can
import os
import pandas as pd
import collections

os.system(f'sudo ip link set can0 type can bitrate 500000')
os.system(f'sudo ifconfig can0 up')
bus = can.Bus("can0", bustype="socketcan", receive_own_messages=True)

#some pids (adresses) that I would like to request and read in the 100's
# list_of_ids = [b'\x00\x10', b'\x01\x10', b'\x10\x10', b'\x11\x10']
# as per original pandas decode n.b. endianess flipped
list_of_ids_cmd = [b'\x00\x10', b'\x01\x10', b'\x10\x10',
                   b'\x11\x10', b'\x12\x10', b'\x13\x10']

list_of_ids_inst = [b'\x80@', b'\x81@', b'\x82@', b'\x83@',
                    b'\x84@', b'\x85@', b'\x86@', b'\x87@']

list_of_ids = list_of_ids_cmd + list_of_ids_inst

#The active ids on my network
request_arbid = [286326784, 287440896] #'0x11110000', '0x11220000'
respond_arbid = [268439810, 268444162] #'0x10001102', '0x10002202'

# this is a placeholder of another config that decides who is gathering the data
master_id = request_arbid[0]

# declaring defaultdict sets default to catch undefined pids
request_dict = collections.defaultdict(lambda: 'This PID has not been configured')
respond_dict = collections.defaultdict(lambda: 'This PID has not been configured')

# a dict for requested data ordered by arbid and pid
request_dict = {request_arbid[i]: list_of_ids_cmd for i in range(len(request_arbid))}

#create somewhere to store data
data_dict = {list_of_ids[i]: None for i in range(len(list_of_ids))}

#this has been made a global so another module can access it
global respond_dict
# data storage by arb id
respond_dict = {respond_arbid[i]: data_dict for i in range(len(respond_arbid))}

# finds arb id and pid and logs the data from the message
async def store_data(arb_id, msg_data, msg_dlc):
    data_subset = respond_dict[arb_id]
    pid = msg_data[0:2]
    if msg_dlc == 5:
        data = msg_data[2:4]
    elif msg_dlc == 7:
        data = msg_data[2:6]
    data_subset[pid] = data

# reads bus
async def read_msg():
    while True:
        msg = bus.recv()
        arb_id = msg.arbitration_id
        msg_data = msg.data
        dlc = msg.dlc
        await store_data(arb_id, msg_data)

#eventually there would be multiple dictionary's
# with different delays and priorities async def req_read(type, delay):
async def req_read(delay):
    pids = request_dict[master_id]
    #get the arb ids currently online
    arb_ids = request_arbid
    while True:
        for i in range(len(arb_ids)):
            arb_id = arb_ids[i]
            for u in range(len(pids)):
                pid = pids[u]
                msg = can.Message(arbitration_id=arb_id, data=pid)
                bus.send(msg)
                await asyncio.sleep(delay)


loop = asyncio.get_event_loop()

#as described may have different sets of data at different rates
loop.create_task(req_read(0.2))
loop.create_task(req_read(0.5))
loop.create_task(req_read(5))

loop.run_forever()
loop.stop()

It would be fantastic if someone could point me in the right direction as to how I could implement this sort of idea with the python-can Asyncio tools like listeners, notifiers.

As always posing the question and laying it out helps and maybe that's why I am here :)

Reowald avatar Jun 22 '22 14:06 Reowald

Here is a simple example for the notifier:

import random
import asyncio

import can


async def print_message(msg: can.Message) -> None:
    print(msg)


async def main() -> None:
    """The main function that runs in the loop."""

    with can.Bus(
        interface="virtual", channel="my_channel_0", receive_own_messages=True
    ) as bus:
        # Create Notifier with an explicit loop to use for scheduling of callbacks
        loop = asyncio.get_running_loop()
        notifier = can.Notifier(bus, [print_message], loop=loop)

        # send messages
        try:
            while True:
                bus.send(can.Message(arbitration_id=0, data=random.randbytes(8)))
                await asyncio.sleep(0.1)
        except KeyboardInterrupt:
            notifier.stop()


if __name__ == "__main__":
    asyncio.run(main())

zariiii9003 avatar Jun 22 '22 14:06 zariiii9003

@zariiii9003 thanks for this yes that does help I will try and implement this and will feedback on how I get on.

Reowald avatar Jun 22 '22 14:06 Reowald

Just a little update - all appears to be working great I have had some issues with other parts of my code but I will repost when I have rocked on a little further.

Reowald avatar Jun 30 '22 12:06 Reowald

Just a little update - still working on this but will provide a cutdown version once I have completed it.

Reowald avatar Jul 25 '22 08:07 Reowald

@Reowald Have you resolved all of your outstanding questions about asyncio listeners and notifiers, or could their still be future questions regarding the topic?

j-c-cook avatar Aug 06 '22 14:08 j-c-cook

@j-c-cook I have gone away to read the docsand am making progress I think lot of my issues were asyncio (and my understanding of this) rather than specially python-can, however I would like to leave this open a week longer as I may come back to this with with some questions.

Reowald avatar Aug 17 '22 10:08 Reowald

Ok my follow question is this.

import asyncio

async def main() -> None:
    with can.Bus(
            interface="socketcan", channel="can0", receive_own_messages=True
    ) as bus:
        # Create Notifier with an explicit loop to use for scheduling of callbacks
        loop = asyncio.get_running_loop()
        notifier = can.Notifier(bus, [store_data], loop=loop)

        try:
            while True:

                await req_inst(bus)
                addr_keys = mbus_server.write_keys
                await write_cmd(bus, addr_keys)
                await write_sys_data(bus, addr_keys)
                mbus_server.write_keys = []

        except KeyboardInterrupt:
            notifier.stop()


if __name__ == "__main__":
    asyncio.run(main())
else:
    pass

I have not given all code but write_cmd() and 'write_sys_data()' run through a but of ids and associated pids much like as described previously. I am currently controlling these functions with await asyncio.sleep(0.1)

Will the store_data act like a integral part of the even loop picking up messages as they come in or will it only pick up all the messages once event loop has started/finished?

Reowald avatar Sep 09 '22 10:09 Reowald

Hello again,

It turns out this vagueness I had about listeners has really come back to bite me! As these things tend to.

I am struggling to pull the data out from the listener! This is a really annoying bug and have been struggling to track it down.

The listener is doing a wonderful job taking all the messages and storing them in a big dictionary

store_data essentially filters the messages into buckets and does a little bit of conversion with struct I imagine this is not too onerous a task. I allow stored_data to access the bus by using await asyncio.sleep(0.001) on all my send message functions. This method seems to work ok. It is quite a linear way to implement concurrency with the priority always falling back to the listener.

async def store_data(msg: can.Message):
    pid = int.from_bytes(msg.data[0:2], 'little')
    arb_id = msg.arbitration_id
    unit_id = (arb_id & (15 << 8)) >> 8
    if pid in faults_warnings:
        fault_tags, warning_tags = can_params.bitmask_tags(msg)
        for i in range(len(fault_tags)):
            faults_data[fault_tags[i]][(unit_id - 1)] = arb_id
        for i in range(len(warning_tags)):
            warnings_data[warning_tags[i]][(unit_id - 1)] = arb_id
    elif arb_id in mcu_respond_ids:
        if msg.dlc == 5:
            just_two_bytes = int.from_bytes(msg.data[2:4], 'little', signed=False)
            all_data[arb_id][pid][0] = just_two_bytes
            all_data[arb_id][pid][2] = struct.unpack('<H', msg.data[2:4])
        elif msg.dlc == 7:
            first_two_bytes = int.from_bytes(msg.data[2:4], 'little', signed=False)
            second_two_bytes = int.from_bytes(msg.data[4:6], 'little', signed=False)
            f = struct.unpack('<f', msg.data[2:6])
            all_data[arb_id][pid][2] = f
            all_data[arb_id][pid][1] = first_two_bytes
            all_data[arb_id][pid][0] = second_two_bytes
            print(all_data)
    else:
        pass
    return

print(all_data) is interpreted correctly. If I then print this all_data dictionary in the main event loop nearly but not 100% of the values are oddly 0 even when originally set them as None when I create the all_data ``dict. If I print all_data out within the stored data again everything looks fine.

Do I need another asyncio task to grab the data from stored_data as the listener is always accessing all_data making it unavailable to the main event loop? Feels like an access problem.

For reference my main event loop is shown below.

`async def main() -> None:
    with can.Bus(
            interface="socketcan", channel="can0", receive_own_messages=True
    ) as bus:

        loop = asyncio.get_running_loop()
        notifier = can.Notifier(bus, [store_data], loop=loop)

        try:
            while True:
                await request_inst(bus)

                addr_keys = mbus_server.write_keys
                print(f' These keys are registered in the event loop {addr_keys}')

                await write_cmd(bus, addr_keys)
                await request_warnings(bus)
                await write_sys_data(bus, addr_keys)

                print(f'This is all the data {all_data}')

                mbus_server.write_keys = []
                elapsed_time = time.time() - start_time
                await asyncio.sleep(0.1)
                task1 = asyncio.create_task(mbus_data.get_data())
                await task1
                print(f'The event loop round robin {elapsed_time}')

        except KeyboardInterrupt:

            notifier.stop()
            bus.shutdown()`

Any suggestions are greatly appreciated. I think I have given enough information and hopefully not too much.

Edit: Started looking at lock = asyncio.Lock() as I think what I am experiencing is some race conditions between the listener and other stuff going on in the event loop. Haven't been able to give the lock to store_data.

Reowald avatar Oct 16 '22 10:10 Reowald

Turns out the issue was I put too much computation in the listener. The incorrect values were real as the listener was picking up in correct data. I stripped everything back and looked at the raw logging data and hey presto. Feels la bit like an own goal but definetly learnt a bit more about asyncio.

image

Reowald avatar Oct 17 '22 17:10 Reowald

Hello,

I trimmed the listener down so that it just filters the data as one into a dict which I can "hopefully access", My feeling is maybe I am just using the listener incorrectly.

This is my improved listener function.

async def store_data(msg: can.Message):
    # global all_data
    # start = time.perf_counter()
    # msg = listen.on_message_received()
    pid = int.from_bytes(msg.data[0:2], 'little')
    arb_id = msg.arbitration_id
    #  bitmasking to find unit id
    unit_id = (arb_id & (15 << 8)) >> 8
    if pid in faults_warnings:
        # print(f'This is faults and warnings {True}')
        fault_tags, warning_tags = can_params.bitmask_tags(msg)
        for i in range(len(fault_tags)):
            faults_data[fault_tags[i]][(unit_id - 1)] = arb_id
        for i in range(len(warning_tags)):
            warnings_data[warning_tags[i]][(unit_id - 1)] = arb_id
    elif arb_id in mcu_respond_ids:
        # print(f'This is 5 bytes {True}')
        if msg.dlc == 5:
            # this is placing the data by locating arb id and the pid
            all_data[arb_id][pid][0] = int.from_bytes(msg.data[2:4], 'little', signed=False)
            # print(f'ID {hex(arb_id)} Description: {can_params.pid_2_tag(pid)} Pid: {hex(pid)} and this is the {f}')
        elif msg.dlc == 7:
            all_data[arb_id][pid][0] = int.from_bytes(msg.data[2:6], 'little', signed=False)
            # floaty = struct.unpack('<f', msg.data[2:6])
            # print(floaty)
    await asyncio.sleep(1)
    print(all_data)

However it intermittently puts int(0) into the dict, I am using to filter the data, which has been causing me all the issues! Below is an output of all_data as it goes from reporting sensible values to zeros. The filtering logic is findd and the hole thing takes around 1.6 X 10-5 to complete so there should be no computational issues which was what I have previously seen evidence of.

possible I am using the listener in totally the wrong fashion and should be yielding data from it as suggested by the asyncio example: msg = await reader.get_message()

image

For reference, this is my event loop incarnation.

async def main() -> None:
    with can.Bus(
            interface="socketcan", channel="can0", receive_own_messages=True
    ) as bus:
        # Create Notifier with an explicit loop to use for scheduling of callbacks
        loop = asyncio.get_running_loop()
        # loop = asyncio.get_event_loop()
        # notifier = can.Notifier(bus, [store_data], loop=loop)
        # lock = asyncio.Lock()
        # notifier = can.Notifier(bus, [store_data], loop=loop)

        reader = can.AsyncBufferedReader()
        logger = can.Logger("logfile.asc")

        listeners: List[MessageRecipient] = [
                                            store_data,
                                            # print_message,
                                            # msg_print_float,
                                            # reader,  # AsyncBufferedReader() listener
                                            # logger,  # Regular Listener object
                                            ]
        notifier = can.Notifier(bus, listeners, loop=loop)
        global event_loop_count
        global all_data
        try:

            while True:

                await request_inst(bus)

        except KeyboardInterrupt:

            notifier.stop()
            bus.shutdown()

And this is request_inst()

async def request_inst(bus: can.Bus):
    print('Request inst active')
    for key in inst_short:
        for val in inst_short[key]:

            pid = int(val)
            pidbytes = pid.to_bytes(2, 'little')
            msg = can.Message(arbitration_id=key, data=pidbytes)
            bus.send(msg)
            await asyncio.sleep(freq)

The journey continues any suggestions would be greatly appreciated!

Reowald avatar Oct 18 '22 12:10 Reowald

OK, something strange is afoot.

async def message_obtain(reader: can.AsyncBufferedReader):
    print('Started it the get message process')
    while True:
        # print(True)
        # print(reader.on_message_received())
        msg = await reader.get_message()
        future = await store_data(msg)
        reader.stop()
        print(future)
async def main() -> None:
    with can.Bus(
            interface="socketcan", channel="can0", receive_own_messages=True
    ) as bus:
        # Create Notifier with an explicit loop to use for scheduling of callbacks
        loop = asyncio.get_running_loop()
        reader = can.AsyncBufferedReader()
        logger = can.Logger("logfile.asc")

        listeners: List[MessageRecipient] = [
                                            msg_print_float,
                                            reader,  # AsyncBufferedReader() listener
                                            logger  # Regular Listener object
                                            ]
        notifier = can.Notifier(bus, listeners, loop=loop)

        try:

            task1 = asyncio.create_task(request_inst(bus))
            task2 = asyncio.create_task(message_obtain(reader))
            await asyncio.gather(task1, task2)


        except KeyboardInterrupt:
            notifier.stop()
            bus.shutdown()

The same thing is happening with the above architecture but just slower. Should I be periodically clearing the listeners ?

image

Reowald avatar Oct 18 '22 14:10 Reowald

Ok one more test was that I just printed out the data read by the listener directly and over an hour of data was good. I seems that the way I am accessing the listener is causing the issue. By itself it is ok reports fine, if access that data something strange happens and an unintended asynchronous event occurs!

Feels a bit like the uncertainty principle :)

async def msg_print_float(msg: can.Message):
    if msg.arbitration_id == 0x10001102:
        if msg.dlc == 7:
            if msg.data[0:2] == (16515).to_bytes(2, 'little'):
                f = struct.unpack('<f', msg.data[2:6])
                pid = int.from_bytes(msg.data[0:2], 'little')
                print(f'This is the id {hex(msg.arbitration_id)} this is the pid: {f}, this is the pid: {hex(pid)} Description: {can_params.pid_2_tag(pid)}')

Reowald avatar Oct 18 '22 15:10 Reowald

OK right tracked it down it is a concurrency issue!

I did a really simple non asyncio send module and a separate receive module and experienced exactly the same issue.

Turns out it was nothing to do with the listener.

Reowald avatar Oct 18 '22 17:10 Reowald