distributed icon indicating copy to clipboard operation
distributed copied to clipboard

No longer start stopped loops in LoopRunner to allow the use of asyncio.run

Open fjetter opened this issue 2 years ago • 11 comments

The LoopRunner is accepting an event loop that has been created externally. This is a pattern that has been deprecated by CPython and our current implementation.

Requirements

  • LoopRunner no longer accepts event loops but creates and owns them itself.
  • There must still be a way to attach multiple servers (e.g. Scheduler and Client) to the same event loop.
  • The LoopRunner guarantees that all existing tornado.IOLoop instances are properly closed if the underlying asyncio event loop is stopped when loop is started and stopped without tornado API (e.g. asyncio.run).
  • It is not required to implement a deprecation cycle

More context in https://github.com/dask/distributed/issues/6049

Implementation details

# Current interface

class LoopRunner:
    def __init__(self, loop=None, asynchronous=False):
        ...

    def start(self):
        ...
    
    def stop(self, timeout=10):
       ...

    def run_sync(self, func, *args, **kwargs):
        ...
  • [x] https://github.com/dask/distributed/pull/6443
  • [x] https://github.com/dask/distributed/pull/6473
  • [ ] https://github.com/dask/distributed/pull/6523
  • [ ] Use loop_in_thread fixture for all tests that use local cluster, client, etc. to make sure they are using the correct event loop
  • [ ] Deprecate passing a stopped loop to the LoopRunner
  • [ ] Remove deprecated code after one to two release cycle

fjetter avatar Apr 20 '22 12:04 fjetter

Keeping an eye on this as it will break coiled's implementation of Cloud

shughes-uk avatar Apr 21 '22 06:04 shughes-uk

This is a pattern that has been deprecated by CPython and our current implementation.

I could be missing something, but I don't see the loop= keyword deprecated in our current LoopRunner implementation. If that is the case, I'd like to see us go through a deprecation cycle because, as @shughes-uk there are other projects out there that use this keyword when constructing their own LoopRunner instances, like dask-kubernetes, coiled, and maybe others

jrbourbeau avatar Apr 21 '22 15:04 jrbourbeau

We are aware that this may break some users but discussed that a deprecation cycle would not be necessary

cc @mrocklin

fjetter avatar Apr 21 '22 16:04 fjetter

@jrbourbeau Whatever version of coiled-runtime first includes this change should enforce a pin on the coiled package to whatever version includes the changes to accomodate this.

dchudz avatar Apr 21 '22 21:04 dchudz

OK, so I should clarify my thinking from before. Certainly we need downstream deployment projects (dask-kubernetes, dask-cloudprovider, etc.) to continue functioning.

What I intended to say before is that I don't care about supporting a user-workflow where users say client = Client(loop=my_loop). Instead I'm comfortable saying client = Client(asynchronous=True).

However, if our own downstream code relies on passing loop= then I'll have to retract my previous statement, and say that certainly we need to provide time for downstream projects to adapt.

mrocklin avatar Apr 22 '22 14:04 mrocklin

This is a pattern that has been deprecated by CPython and our current implementation.

I could be missing something, but I don't see the loop= keyword deprecated in our current LoopRunner implementation. If that is the case, I'd like to see us go through a deprecation cycle because, as @shughes-uk there are other projects out there that use this keyword when constructing their own LoopRunner instances, like dask-kubernetes, coiled, and maybe others

the use of LoopRunner in dask-kubernetes is incorrect, this test fails because distributed.deploy.Cluster re-assigns the self.loop and self._loop_runner

def test_loop(k8s_cluster, release, test_namespace):
    from dask_kubernetes import HelmCluster

    with Client(nthreads=[]) as client, HelmCluster(
        release_name=release, namespace=test_namespace, loop=client.loop
    ) as cluster:
        assert cluster.loop is client.loop  # this fails because HelmCluster's parent class replaces the cluster.loop

graingert avatar May 17 '22 13:05 graingert

cc @jacobtomlinson (regarding loop handling in Dask-Kubernetes)

jakirkham avatar May 17 '22 23:05 jakirkham

Thanks for the heads up (and thanks for the PR in dask-kubernetes @graingert).

It is likely the asyncio stuff in downstream projects like dask-kubernetes, dask-cloudprovider, dask-jobqueue, dask-ctl, etc, etc needs some love. Especially with these upstream changes.

However, if our own downstream code relies on passing loop= then I'll have to retract my previous statement, and say that certainly we need to provide time for downstream projects to adapt.

In many of these projects I've copy/pasted chunks from distributed and other SpecCluster implementations, so if there are changes necessary here it is likely those will need to be propagated elsewhere. So it's not like there are intentional reasons for things to be the way they are. However, I've already seen PRs like #6205 breaking downstream projects.

jacobtomlinson avatar May 18 '22 13:05 jacobtomlinson

So a quick clarifier: without loop=, asynchronous instances can only ever be constructed within a coroutine - correct? That doesn't seem to be too burdensome.

martindurant avatar May 18 '22 19:05 martindurant

Do you need more feedback or anything else to move on?

martindurant avatar May 25 '22 20:05 martindurant

Do you need more feedback or anything else to move on?

No I don't think so, I'm mostly fixing places in the codebase and tests where a loop that's about to be run is required.

rather than deprecating passing loops at all, to LoopRunner, Client, LocalCluster and SpecCluster I now plan on only deprecating passing loops that are not running, eg a loop received from the deprecated asyncio.get_event_loop()

so the plan is:

loop = IOLoop.current()  # deprecated by asyncio and tornado
c = Client(..., loop=loop)  # deprecated by distributed
cluster = SomeCluster(..., loop=loop)  # deprecated by distributed

however if you do need a global loop you would be able use:

runner = LoopRunner(asynchronous=False, loop=None)
loop = runner.start()
c = Client(..., loop=loop)
cluster = SomeCluster(..., loop=loop)
runner.stop()

but it's preferable to use:

with Client(..., loop=None) as c:
    with SomeCluster(..., loop=c.loop) as cluster:
        ...

or make your own running loop:

async def call(fn):
    return fn

with loop_in_thread() as asyncio_loop:
    io_loop = asyncio.run_coroutine_threadsafe(coro=call(IOLoop.current), loop=asyncio_loop).result()
    with Client(..., loop=io_loop) as c:
        with SomeCluster(..., loop=io_loop) as cluster:
            ...

graingert avatar Jun 24 '22 13:06 graingert