distributed
distributed copied to clipboard
handle `distributed.core.Server` startup and shutdown excellently
Server startup and shutdown is currently confusing and error prone see https://github.com/dask/distributed/pull/6615 and a8244bd (#6603)
patterns evolving concurrent and re-entrant close and cancellation are prone to deadlocks:
try:
self.comm.read() # close call cancels this task and waits for this task to finish
finally:
await self.close() # this waits for close to finish
eg https://github.com/dask/distributed/blob/bc04d0e29077c675f24b867e435a6aef3f0652cd/distributed/worker.py#L1201-L1210
I think a pattern where only the task that calls async with Server(...): ... are allowed to call await self.finish() or await self.close()
a sketch here
class Server:
def __init__(self):
self.__close_done = asyncio.Event()
self.__start_event = asyncio.Event()
self.__close_event = asyncio.Event()
def request_close(self):
self.__start_event.set()
self.__close_event.set()
async def __lifecycle(self):
try:
await self.__start_event.wait()
async with self.listen(), self.open_rpc_pool():
try:
await self.start()
self.__start_event.set()
await self.__close_event.wait()
await self.close()
finally:
v = self.abort() # abort comms by calling socket.close()
assert v is None # abort must not be an async def
finally:
self.__close_done.set()
async def __aenter__(self):
self.__parent_task = asyncio.current_task()
self.__lifecyle_task = asyncio.create_task(self.__lifecyle())
await self.__start_event.wait()
def __await__(self):
warnings.warn("await Server() is deprecated, use async with Server()")
# ??? some background task magic here
async def __aexit__(self):
self.request_close()
await self.finished()
async def close(self):
try:
assert asyncio.current_task() is self.__lifecyle_task
# close comms, wait for tasks to cancel
finally:
v = self.abort() # abort comms by calling socket.close()
assert v is None
async def finished(self):
assert asyncio.current_task() is self.__parent_task
await self.__close_done.wait()
breaking ground on enabling this refactor here https://github.com/dask/distributed/pull/6836
I think we can both require async with Client(asynchronous=True): and still keep client = Client(asynchronous=False) working
because the async context manager version needs to interact with all the await statements between open and close, but there's no such concern on the synchronous "run in thread" version of Client