tornado icon indicating copy to clipboard operation
tornado copied to clipboard

support IOLoop.close() when called after asyncio.run

Open graingert opened this issue 2 years ago • 1 comments

I've been using the following function to run the distributed test suite:

def _run_and_close_tornado(async_fn, /, *args, **kwargs):
    tornado_loop = None

    async def inner_fn():
        nonlocal tornado_loop
        tornado_loop = IOLoop.current()
        return await async_fn(*args, **kwargs)

    try:
        return asyncio.run(inner_fn())
    finally:
        tornado_loop.close(all_fds=True)

Which has worked great with non-overlapping calls to IOLoop.current(), however I recently introduced an overlapping call, which results in a KeyError:

____________________ test_inproc_specific_different_threads ____________________

    @gen_test()
    async def test_inproc_specific_different_threads():
>       await check_inproc_specific(run_coro_in_thread)

distributed/comm/tests/test_comms.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/comm/tests/test_comms.py:431: in check_inproc_specific
    await asyncio.gather(*futures)
distributed/comm/tests/test_comms.py:447: in run_coro_in_thread
    return await asyncio.to_thread(run_and_close_tornado, run_with_timeout)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.10/asyncio/threads.py:25: in to_thread
    return await loop.run_in_executor(None, func_call)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.10/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:506: in run_and_close_tornado
    tornado_loop.close(all_fds=True)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f7a6addb280>
all_fds = True

    def close(self, all_fds: bool = False) -> None:
        self.closing = True
        for fd in list(self.handlers):
            fileobj, handler_func = self.handlers[fd]
            self.remove_handler(fd)
            if all_fds:
                self.close_fd(fileobj)
        # Remove the mapping before closing the asyncio loop. If this
        # happened in the other order, we could race against another
        # initialize() call which would see the closed asyncio loop,
        # assume it was closed from the asyncio side, and do this
        # cleanup for us, leading to a KeyError.
>       del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
E       KeyError: <_UnixSelectorEventLoop running=False closed=True debug=False>

https://github.com/dask/distributed/actions/runs/5475797836/jobs/9972404903#step:19:2420

@bdarnell could you catch the KeyError so that I can run AbstractEventLoop.close() then IOLoop.close()?

graingert avatar Jul 06 '23 14:07 graingert

My assumption has been that you wouldn't mix IOLoop and asyncio event loop control methods in the same app: you could use IOLoop.run_sync instead of asyncio.run, but in that case you'd rely on the asyncio lifecycle instead of Tornado's. Asyncio doesn't have an all_fds=True equivalent, but I'd expect that if you're relying on that you'd use the ioloop methods instead of asyncio.run. Is there some reason you can't either use the IOLoop methods here or avoid relying on all_fds=True? (If you need this functionality we should look into getting it added to asyncio - I want to eventually phase out all usage of IOLoop)

bdarnell avatar Jul 08 '23 01:07 bdarnell