support IOLoop.close() when called after asyncio.run
I've been using the following function to run the distributed test suite:
def _run_and_close_tornado(async_fn, /, *args, **kwargs):
tornado_loop = None
async def inner_fn():
nonlocal tornado_loop
tornado_loop = IOLoop.current()
return await async_fn(*args, **kwargs)
try:
return asyncio.run(inner_fn())
finally:
tornado_loop.close(all_fds=True)
Which has worked great with non-overlapping calls to IOLoop.current(), however I recently introduced an overlapping call, which results in a KeyError:
____________________ test_inproc_specific_different_threads ____________________
@gen_test()
async def test_inproc_specific_different_threads():
> await check_inproc_specific(run_coro_in_thread)
distributed/comm/tests/test_comms.py:457:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/comm/tests/test_comms.py:431: in check_inproc_specific
await asyncio.gather(*futures)
distributed/comm/tests/test_comms.py:447: in run_coro_in_thread
return await asyncio.to_thread(run_and_close_tornado, run_with_timeout)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.10/asyncio/threads.py:25: in to_thread
return await loop.run_in_executor(None, func_call)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.10/concurrent/futures/thread.py:58: in run
result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:506: in run_and_close_tornado
tornado_loop.close(all_fds=True)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f7a6addb280>
all_fds = True
def close(self, all_fds: bool = False) -> None:
self.closing = True
for fd in list(self.handlers):
fileobj, handler_func = self.handlers[fd]
self.remove_handler(fd)
if all_fds:
self.close_fd(fileobj)
# Remove the mapping before closing the asyncio loop. If this
# happened in the other order, we could race against another
# initialize() call which would see the closed asyncio loop,
# assume it was closed from the asyncio side, and do this
# cleanup for us, leading to a KeyError.
> del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
E KeyError: <_UnixSelectorEventLoop running=False closed=True debug=False>
https://github.com/dask/distributed/actions/runs/5475797836/jobs/9972404903#step:19:2420
@bdarnell could you catch the KeyError so that I can run AbstractEventLoop.close() then IOLoop.close()?
My assumption has been that you wouldn't mix IOLoop and asyncio event loop control methods in the same app: you could use IOLoop.run_sync instead of asyncio.run, but in that case you'd rely on the asyncio lifecycle instead of Tornado's. Asyncio doesn't have an all_fds=True equivalent, but I'd expect that if you're relying on that you'd use the ioloop methods instead of asyncio.run. Is there some reason you can't either use the IOLoop methods here or avoid relying on all_fds=True? (If you need this functionality we should look into getting it added to asyncio - I want to eventually phase out all usage of IOLoop)