distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Support creating a `ProcessInterface` without a running loop in 3.8 and 3.9

Open graingert opened this issue 3 years ago • 1 comments

What happened:

dask-jobqueue creates a ProcessInterface as a test outside of a running event loop, this is supported on 3.10 thanks to the asyncio.mixins._LoopBoundMixin, this is not supported on 3.8 and 3.9 because asyncio.Lock and asyncio.Event call the deprecated asyncio.get_event_loop()

Python 3.9.13 | packaged by conda-forge | (main, May 27 2022, 16:56:21) 
[GCC 10.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from distributed.deploy.spec import ProcessInterface
>>> ProcessInterface()
<distributed.deploy.spec.ProcessInterface: status=created>
>>> async def noop(): pass
... 
>>> import asyncio
>>> asyncio.run(noop())
>>> ProcessInterface()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/graingert/projects/distributed/distributed/deploy/spec.py", line 56, in __init__
    self.lock = asyncio.Lock()
  File "/home/graingert/anaconda3/envs/dask-distributed-39/lib/python3.9/asyncio/locks.py", line 81, in __init__
    self._loop = events.get_event_loop()
  File "/home/graingert/anaconda3/envs/dask-distributed-39/lib/python3.9/asyncio/events.py", line 642, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'MainThread'.
>>> 

What you expected to happen:

from distributed.deploy.spec import ProcessInterface
async def noop(): pass
ProcessInterface()  # no deprecation warning
asyncio.run(noop)
ProcessInterface()  # no deprecation warning

Anything else we need to know?:

graingert avatar Aug 09 '22 11:08 graingert

one option is to backport asyncio.Lock and asyncio.Event from 3.10 similar to distributed.actor._LateLoopEvent the other option is to use cached_property:

class ProcessInterface:
    def __init__(self, scheduler=None, name=None):
        self.address = getattr(self, "address", None)
        self.external_address = None
        self.status = Status.created

    @functools.cached_property
    def lock(self):
        return asyncio.Lock()

    @functools.cached_property
    def _event_finished(self):
        return asyncio.Event()

graingert avatar Aug 09 '22 12:08 graingert