dask-ctl
dask-ctl copied to clipboard
Allow users to use lifecycle methods both sync and async
Dask Control has a selection of lifecycle functions that can be used within Python to manage your Dask clusters. You can list clusters, get instances of an existing cluster, create new ones, scale and delete them.
Currently these methods are implemented as async closures within a sync function that starts the loop. This PR breaks things out so that users can choose to import the async method directly from the dask_ctl.asyncio submodule or choose to use the sync version from dask_ctl.
Sync
from dask_ctl import list_clusters
clusters = list_clusters()
Async
from dask_ctl.asyncio import list_clusters
clusters = await list_clusters()
I really appreciate the review here @graingert.
Perhaps it is worth taking a step back and talking about goals. Much of this work is pushing my asyncio knowledge to the limit, and it's a moving target that I struggle to keep up with at the best of times.
The goal is for the Python API in dask-ctl to be implemented as async functions first. Most of which can be imported from dask_ctl.asyncio. But I also want sync implementations to exist, these will be the default and imported directly from dask_ctl.
To avoid code duplication I'm trying to just wrap the async functions in a sync function that starts a loop and runs them. I'm also conscious that folks may use the sync functions within async environments (interactively in IPython is the simplest example) so the sync methods need to also behave nicely when the loop is already running. To handle this I made a little dask_ctl.utils.run_sync function to try and work around this, but it feels hacky and as you say isn't thread-safe.
If you or other folks who know more about asyncio have suggestions on good ways to implement this I'd be really keen to hear and to increase my understanding.
How about something like this?
async def canary():
pass
try:
asyncio.run(canary())
except RuntimeError:
# event loop is already running and not running with jupyter (eg nest-asyncio)
pass
else:
return asyncio.run(async_fn())
with concurrent.futures.ThreadPoolExecutor(1) as tpe:
return tpe.submit(asyncio.run, async_fn()).result()