Errors in callbacks when running async is not calling _on_error()
If a Notifier is created using an asyncio loop, any errors that occurs in the receive callbacks will not call the Listener._on_error() callback. It does when not running asyncio.
Looking at Notifier._rx_thread(): When a loop is present it will call Notifier._on_message_received() using the self._loop.call_soon_threadsafe() call. However the except Exception as exc: in line 127 will never be reached, because any errors in the call_soon_threadsafe() callback does not return the exceptions for the function it calls. This in turn implies that when asyncio is enabled, the Listener._on_error() callback won't be called either which is unexpected.
https://github.com/hardbyte/python-can/blob/654a02ae24bfc50bf1bb1fad7aab4aa88763d302/can/notifier.py#L111-L137
The fix would be to either encapsulate Notifier._on_message_received() in an except block and call the _on_error() call back from it. Another solution is to make it call an extra handler when running async:
def _rx_thread(self, bus: BusABC) -> None:
# determine message handling callable early, not inside while loop
if self._loop:
def rx_handler(msg: Message) -> None:
try:
self._on_message_received(msg)
except Exception as exc:
if not self._on_error(exc):
raise
else:
# It was handled, so only log it
logger.debug("suppressed exception: %s", exc)
handle_message: Callable[[Message], Any] = functools.partial(
self._loop.call_soon_threadsafe,
rx_handler, # type: ignore[arg-type]
)
else:
handle_message = self._on_message_received
...
Probably related to #1865
In addition, exception that occur in async callbacks, are currently not handled due to the loop.create_task() method. If combined with #1938, the process of running an async function in Notifier._on_message_received() could look like this:
def _task_done(task: asyncio.Task) -> None:
self._tasks.discard(task)
if (exc := task.exception) is not None:
self.exception = exc
if self._loop is not None:
self._loop.call_soon_threadsafe(self._on_error, exc)
task = self._loop.create_task(res)
self._tasks.add(task)
task.add_done_callback(_task_done)