distributed icon indicating copy to clipboard operation
distributed copied to clipboard

handle `distributed.core.Server` startup and shutdown excellently

Open graingert opened this issue 3 years ago • 1 comments

Server startup and shutdown is currently confusing and error prone see https://github.com/dask/distributed/pull/6615 and a8244bd (#6603)

patterns evolving concurrent and re-entrant close and cancellation are prone to deadlocks:

try:
    self.comm.read()  # close call cancels this task and waits for this task to finish
finally:
    await self.close()  # this waits for close to finish

eg https://github.com/dask/distributed/blob/bc04d0e29077c675f24b867e435a6aef3f0652cd/distributed/worker.py#L1201-L1210

I think a pattern where only the task that calls async with Server(...): ... are allowed to call await self.finish() or await self.close()

a sketch here

class Server:
    def __init__(self):
        self.__close_done = asyncio.Event()
        self.__start_event = asyncio.Event()
        self.__close_event = asyncio.Event()
        
    def request_close(self):
        self.__start_event.set()
        self.__close_event.set()

    async def __lifecycle(self):
        try:
            await self.__start_event.wait()
            async with self.listen(), self.open_rpc_pool():
                try:
                    await self.start()
                    self.__start_event.set()
                    await self.__close_event.wait()
                    await self.close()
                finally:
                    v = self.abort()  # abort comms by calling socket.close()
                    assert v is None  # abort must not be an async def
        finally:
            self.__close_done.set()
            
    async def __aenter__(self):
        self.__parent_task = asyncio.current_task()
        self.__lifecyle_task = asyncio.create_task(self.__lifecyle())
        await self.__start_event.wait()
        
    def __await__(self):
        warnings.warn("await Server() is deprecated, use async with Server()")
        # ??? some background task magic here
        
    async def __aexit__(self):
        self.request_close()
        await self.finished()
        
    async def close(self):
        try:
            assert asyncio.current_task() is self.__lifecyle_task
            # close comms, wait for tasks to cancel
        finally:
            v = self.abort()  # abort comms by calling socket.close()
            assert v is None
        
    async def finished(self):
        assert asyncio.current_task() is self.__parent_task
        await self.__close_done.wait()

graingert avatar Jun 23 '22 11:06 graingert

breaking ground on enabling this refactor here https://github.com/dask/distributed/pull/6836

I think we can both require async with Client(asynchronous=True): and still keep client = Client(asynchronous=False) working because the async context manager version needs to interact with all the await statements between open and close, but there's no such concern on the synchronous "run in thread" version of Client

graingert avatar Aug 08 '22 09:08 graingert