distributed
distributed copied to clipboard
No longer start stopped loops in LoopRunner to allow the use of asyncio.run
The LoopRunner
is accepting an event loop that has been created externally. This is a pattern that has been deprecated by CPython and our current implementation.
Requirements
-
LoopRunner
no longer accepts event loops but creates and owns them itself. - There must still be a way to attach multiple servers (e.g. Scheduler and Client) to the same event loop.
- The
LoopRunner
guarantees that all existingtornado.IOLoop
instances are properly closed if the underlyingasyncio
event loop is stopped when loop is started and stopped without tornado API (e.g.asyncio.run
). - It is not required to implement a deprecation cycle
More context in https://github.com/dask/distributed/issues/6049
Implementation details
# Current interface
class LoopRunner:
def __init__(self, loop=None, asynchronous=False):
...
def start(self):
...
def stop(self, timeout=10):
...
def run_sync(self, func, *args, **kwargs):
...
- [x] https://github.com/dask/distributed/pull/6443
- [x] https://github.com/dask/distributed/pull/6473
- [ ] https://github.com/dask/distributed/pull/6523
- [ ] Use loop_in_thread fixture for all tests that use local cluster, client, etc. to make sure they are using the correct event loop
- [ ] Deprecate passing a stopped loop to the LoopRunner
- [ ] Remove deprecated code after one to two release cycle
Keeping an eye on this as it will break coiled's implementation of Cloud
This is a pattern that has been deprecated by CPython and our current implementation.
I could be missing something, but I don't see the loop=
keyword deprecated in our current LoopRunner
implementation. If that is the case, I'd like to see us go through a deprecation cycle because, as @shughes-uk there are other projects out there that use this keyword when constructing their own LoopRunner
instances, like dask-kubernetes
, coiled
, and maybe others
We are aware that this may break some users but discussed that a deprecation cycle would not be necessary
cc @mrocklin
@jrbourbeau Whatever version of coiled-runtime
first includes this change should enforce a pin on the coiled
package to whatever version includes the changes to accomodate this.
OK, so I should clarify my thinking from before. Certainly we need downstream deployment projects (dask-kubernetes, dask-cloudprovider, etc.) to continue functioning.
What I intended to say before is that I don't care about supporting a user-workflow where users say client = Client(loop=my_loop)
. Instead I'm comfortable saying client = Client(asynchronous=True)
.
However, if our own downstream code relies on passing loop=
then I'll have to retract my previous statement, and say that certainly we need to provide time for downstream projects to adapt.
This is a pattern that has been deprecated by CPython and our current implementation.
I could be missing something, but I don't see the
loop=
keyword deprecated in our currentLoopRunner
implementation. If that is the case, I'd like to see us go through a deprecation cycle because, as @shughes-uk there are other projects out there that use this keyword when constructing their ownLoopRunner
instances, likedask-kubernetes
,coiled
, and maybe others
the use of LoopRunner in dask-kubernetes is incorrect, this test fails because distributed.deploy.Cluster re-assigns the self.loop
and self._loop_runner
def test_loop(k8s_cluster, release, test_namespace):
from dask_kubernetes import HelmCluster
with Client(nthreads=[]) as client, HelmCluster(
release_name=release, namespace=test_namespace, loop=client.loop
) as cluster:
assert cluster.loop is client.loop # this fails because HelmCluster's parent class replaces the cluster.loop
cc @jacobtomlinson (regarding loop handling in Dask-Kubernetes)
Thanks for the heads up (and thanks for the PR in dask-kubernetes
@graingert).
It is likely the asyncio stuff in downstream projects like dask-kubernetes
, dask-cloudprovider
, dask-jobqueue
, dask-ctl
, etc, etc needs some love. Especially with these upstream changes.
However, if our own downstream code relies on passing loop= then I'll have to retract my previous statement, and say that certainly we need to provide time for downstream projects to adapt.
In many of these projects I've copy/pasted chunks from distributed
and other SpecCluster
implementations, so if there are changes necessary here it is likely those will need to be propagated elsewhere. So it's not like there are intentional reasons for things to be the way they are. However, I've already seen PRs like #6205 breaking downstream projects.
So a quick clarifier: without loop=
, asynchronous instances can only ever be constructed within a coroutine - correct? That doesn't seem to be too burdensome.
Do you need more feedback or anything else to move on?
Do you need more feedback or anything else to move on?
No I don't think so, I'm mostly fixing places in the codebase and tests where a loop that's about to be run is required.
rather than deprecating passing loops at all, to LoopRunner, Client, LocalCluster and SpecCluster I now plan on only deprecating passing loops that are not running, eg a loop received from the deprecated asyncio.get_event_loop()
so the plan is:
loop = IOLoop.current() # deprecated by asyncio and tornado
c = Client(..., loop=loop) # deprecated by distributed
cluster = SomeCluster(..., loop=loop) # deprecated by distributed
however if you do need a global loop you would be able use:
runner = LoopRunner(asynchronous=False, loop=None)
loop = runner.start()
c = Client(..., loop=loop)
cluster = SomeCluster(..., loop=loop)
runner.stop()
but it's preferable to use:
with Client(..., loop=None) as c:
with SomeCluster(..., loop=c.loop) as cluster:
...
or make your own running loop:
async def call(fn):
return fn
with loop_in_thread() as asyncio_loop:
io_loop = asyncio.run_coroutine_threadsafe(coro=call(IOLoop.current), loop=asyncio_loop).result()
with Client(..., loop=io_loop) as c:
with SomeCluster(..., loop=io_loop) as cluster:
...