CAproto asyncio client going into endless loop on teardown when used with pytest
Hello,
here is a bug in the caproto.asyncio infrastructure.
Consider the following pytest unit test, based on caproto:
import asyncio, pytest
from caproto.asyncio.client import Context as ClientContext
from caproto.asyncio.server import Context as ServerContext
from caproto.server import PVGroup, pvproperty
from caproto.sync.client import read as ca_read
import multiprocessing as mp
#mp.set_start_method("spawn")
class Server(PVGroup):
foo = pvproperty(value=3.14)
async def run_ioc():
srv = Server(prefix="catest:")
print(f'PVs: {srv.pvdb}')
ctx = ServerContext(srv.pvdb)
await ctx.run()
def run_server():
asyncio.run(run_ioc())
@pytest.mark.asyncio
async def test_foo():
p = mp.Process(target=run_server)
p.start()
await asyncio.sleep(1.0)
if False: #True:
ctx = ClientContext()
foo, = await ctx.get_pvs('catest:foo')
d = await foo.read()
else:
d = ca_read('catest:foo').data[0]
print(f'Read: {d}')
p.kill()
p.join()
As it is (i.e. spawning an asyncio IOC with the PV "catest:foo" and reading it with the synchronous client), the test behaves as it should:
$ pytest -s ./ca-test.py
============================================================= test session starts =============================================================
platform linux -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: /var/home/florin/tmp/ca-test
plugins: cov-5.0.0, asyncio-0.23.6
asyncio: mode=Mode.STRICT
collected 1 item
ca-test.py PVs: OrderedDict({'catest:foo': <caproto.server.server.PvpropertyDouble object at 0x7f11995e6690>})
Read: 3.14
.
============================================================== warnings summary ===============================================================
../../projects/udkm/caproto/caproto/_constants.py:11
/var/home/florin/projects/udkm/caproto/caproto/_constants.py:11: DeprecationWarning: datetime.datetime.utcfromtimestamp() is deprecated and schedu
led for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.fromtimestamp(timestamp, datetime.U
TC).
EPICS_EPOCH = datetime.datetime.utcfromtimestamp(EPICS2UNIX_EPOCH)
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
======================================================== 1 passed, 1 warning in 1.18s =========================================================
(There are a number of deprecation warnings as to API elements of more recent Python versions, but that isn't an issue -- yet).
Now if you change the line that says if False: to if True:, i.e. deactivating the synchronous client and building an asyncio client context instead, eventually this happens:
[...]
Circuit command evaluation failed: ReadNotifyResponse(data=array([3.14]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name=
'ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'),
ioid=0, metadata=None)
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 1215, in _command_queue_loop
command = await self.command_queue.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 155, in get
getter = self._get_loop().create_future()
^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/mixins.py", line 20, in _get_loop
raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError: <Queue at 0x7f1ca7beb5f0 maxsize=0 _getters[1] tasks=4> is bound to a different event loop
[...]
This is repeated over and over again, in an endless loop.
It isn't difficult to trace this to line caproto/asyncio/client.py:1225, and explicitly catching a RuntimeError and breaking the loop actually kind-of fixes the issue. But then this happens:
Exception ignored in: <coroutine object Context._process_search_results_loop at 0x7fe6ef3ff8b0>
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 812, in _process_search_results_loop
address, names = await self._search_results_queue.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
getter.cancel() # Just in case getter is not done yet.
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
self._check_closed()
File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<Context._process_search_results_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:807> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<Context._activate_subscriptions_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:789> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<SharedBroadcaster._broadcaster_retry_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:446> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Exception ignored in: <coroutine object SharedBroadcaster._broadcaster_receive_loop at 0x7fe6ee72dea0>
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 217, in _broadcaster_receive_loop
_, bytes_received, address = await self.receive_queue.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
getter.cancel() # Just in case getter is not done yet.
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
self._check_closed()
File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<SharedBroadcaster._broadcaster_receive_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:213> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<SharedBroadcaster._check_for_unresponsive_servers_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:363> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Exception ignored in: <coroutine object _CallbackExecutor._callback_loop at 0x7fe6ee6fb450>
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 222, in _callback_loop
callback, args, kwargs = await self.callbacks.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
getter.cancel() # Just in case getter is not done yet.
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
self._check_closed()
File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-12' coro=<_CallbackExecutor._callback_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py:214> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-20' coro=<VirtualCircuitManager._transport_receive_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:977> wait_for=<Future finished result=None> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-21' coro=<VirtualCircuitManager._command_queue_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:1211> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
I.e. a bunch of errors come up, to my understanding related to the fact that there are still asyncio tasks running that haven't been properly shut down.
The unit test does what it should, though (output is similarly to the synchronous case) and concludes as passing. It's just the giant error message that's messy.
Adding a ctx.disconnect(), so that the relevant part of the test reads like this:
[...]
if True:
ctx = ClientContext()
foo, = await ctx.get_pvs('catest:foo')
d = await foo.read()
await ctx.disconnect()
[...]
Then the output of the pytest run looks more sane:
[...]
Task was destroyed but it is pending!
task: <Task pending name='Task-21' coro=<VirtualCircuitManager._command_queue_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py:1211> wait_for=<Future finished result=None> cb=[_TaskHandler._remove_completed_task()]>
This happens both with the patched version of CAproto (i.e. the one with my ad-hoc "...except RuntimeError" modification) and with pristine CAproto from today's checkout.
I'm not sure what the correct behavior (from caproto or from the programmer :-) ) would be. I think I can identify at least the following issues:
- The
continuestatement inasyncio/client.py, which makes the loop go on onRuntimeError-- it behaves correctly within a regular application (e.g. a caproto client application, or a caproto IOC which also monitors some other PV, thus also serving as an async client); it doesn't crash. But it crashes in a pytest. I think that a program that runs "successfully" should also pass a pytest. - Is
await-ing actx.disonnect()on a client an official requirement now? Or not? - The error message when a
ctx.disconnect()was issued (Task was destroyed but it is pending!error when using with pytest and actx.disconenct()) feels like it shouldn't be there. There's likely a.cancel()to be issued, and anasyncio.CancelledErrorto be caught in there somewhere. - What would be the side effects of the
...except RuntimeErrorin the main asyncio client loop? Obviously it would outright crash more often, but... is there a known case where this kind of crash is not desired? Is there a reason why every other error is accepted andcontinue'd upon, or is it just a temporary development measure trying to figure out what could go wrong?
I can invest a bit of time in trying to properly fix this, but I need a bit of insight as to the inventor's intention here :-) Does anyone (@tacaswell?) have any pointers as to how to unwind this (except for "make sure your applications call ctx.disconnect()") on the CAproto side? Is just looking for the rogue task, starting in VirtualCircuitManager._command_queue_loop(), essentially all that should be done?
Thanks & Cheers, F.
I've uploaded my patch here (please let me know if you'd like this handled differently).
The full error message when the pytest fails is this (previously impossible to see because of the endless loop):
pytest -s ./ca-test.py
=============================================================== test session starts ================================================================
platform linux -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: /var/home/florin/tmp/ca-test
plugins: cov-5.0.0, asyncio-0.23.6
asyncio: mode=Mode.STRICT
collected 1 item
ca-test.py PVs: OrderedDict({'catest:foo': <caproto.server.server.PvpropertyDouble object at 0x7f20f32cb6b0>})
Read: ReadNotifyResponse(data=array([3.14]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_wi
th_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)
.
================================================================= warnings summary =================================================================
../../projects/udkm/caproto/caproto/_constants.py:11
/var/home/florin/projects/udkm/caproto/caproto/_constants.py:11: DeprecationWarning: datetime.datetime.utcfromtimestamp() is deprecated and schedu
led for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.fromtimestamp(timestamp, datetime.U
TC).
EPICS_EPOCH = datetime.datetime.utcfromtimestamp(EPICS2UNIX_EPOCH)
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
=========================================================== 1 passed, 1 warning in 1.17s ===========================================================
Exception ignored in: <coroutine object Context._process_search_results_loop at 0x7f1fee7ef8b0>
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 813, in _process_search_results_loop
address, names = await self._search_results_queue.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
getter.cancel() # Just in case getter is not done yet.
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
self._check_closed()
File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<Context._process_search_results_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/async
io/client.py:808> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<Context._activate_subscriptions_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/async
io/client.py:790> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<SharedBroadcaster._broadcaster_retry_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/
asyncio/client.py:447> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Exception ignored in: <coroutine object SharedBroadcaster._broadcaster_receive_loop at 0x7f1fedd1dea0>
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 218, in _broadcaster_receive_loop
_, bytes_received, address = await self.receive_queue.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
getter.cancel() # Just in case getter is not done yet.
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
self._check_closed()
File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<SharedBroadcaster._broadcaster_receive_loop() done, defined at /var/home/florin/projects/udkm/caproto/caprot
o/asyncio/client.py:214> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<SharedBroadcaster._check_for_unresponsive_servers_loop() done, defined at /var/home/florin/projects/udkm/cap
roto/caproto/asyncio/client.py:364> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_TaskHandler._remove_completed_task()]>
Exception ignored in: <coroutine object _CallbackExecutor._callback_loop at 0x7f1fedaeb560>
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 222, in _callback_loop
callback, args, kwargs = await self.callbacks.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
getter.cancel() # Just in case getter is not done yet.
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
self._check_closed()
File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-12' coro=<_CallbackExecutor._callback_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto/asyncio/
utils.py:214> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Task was destroyed but it is pending!
task: <Task pending name='Task-20' coro=<VirtualCircuitManager._transport_receive_loop() done, defined at /var/home/florin/projects/udkm/caproto/cap
roto/asyncio/client.py:978> wait_for=<Future finished result=None> cb=[_TaskHandler._remove_completed_task()]>
Probably an asyncio error: Event loop is closed. Traceback: Traceback (most recent call last):
File "/usr/lib64/python3.12/asyncio/queues.py", line 158, in get
await getter
GeneratorExit
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 1216, in _command_queue_loop
command = await self.command_queue.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
getter.cancel() # Just in case getter is not done yet.
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
self._check_closed()
File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Traceback (most recent call last):
File "/usr/lib64/python3.12/asyncio/queues.py", line 158, in get
await getter
GeneratorExit
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/client.py", line 1216, in _command_queue_loop
command = await self.command_queue.async_get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/var/home/florin/projects/udkm/caproto/caproto/asyncio/utils.py", line 28, in async_get
return await self._queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/queues.py", line 160, in get
getter.cancel() # Just in case getter is not done yet.
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.12/asyncio/base_events.py", line 793, in call_soon
self._check_closed()
File "/usr/lib64/python3.12/asyncio/base_events.py", line 540, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-21' coro=<VirtualCircuitManager._command_queue_loop() done, defined at /var/home/florin/projects/udkm/caproto/caproto
/asyncio/client.py:1212> wait_for=<Future cancelled> cb=[_TaskHandler._remove_completed_task()]>
Cheers, F.