distributed
distributed copied to clipboard
Support creating a `ProcessInterface` without a running loop in 3.8 and 3.9
What happened:
dask-jobqueue creates a ProcessInterface as a test outside of a running event loop, this is supported on 3.10 thanks to the asyncio.mixins._LoopBoundMixin, this is not supported on 3.8 and 3.9 because asyncio.Lock and asyncio.Event call the deprecated asyncio.get_event_loop()
Python 3.9.13 | packaged by conda-forge | (main, May 27 2022, 16:56:21)
[GCC 10.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from distributed.deploy.spec import ProcessInterface
>>> ProcessInterface()
<distributed.deploy.spec.ProcessInterface: status=created>
>>> async def noop(): pass
...
>>> import asyncio
>>> asyncio.run(noop())
>>> ProcessInterface()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/graingert/projects/distributed/distributed/deploy/spec.py", line 56, in __init__
self.lock = asyncio.Lock()
File "/home/graingert/anaconda3/envs/dask-distributed-39/lib/python3.9/asyncio/locks.py", line 81, in __init__
self._loop = events.get_event_loop()
File "/home/graingert/anaconda3/envs/dask-distributed-39/lib/python3.9/asyncio/events.py", line 642, in get_event_loop
raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'MainThread'.
>>>
What you expected to happen:
from distributed.deploy.spec import ProcessInterface
async def noop(): pass
ProcessInterface() # no deprecation warning
asyncio.run(noop)
ProcessInterface() # no deprecation warning
Anything else we need to know?:
one option is to backport asyncio.Lock and asyncio.Event from 3.10 similar to distributed.actor._LateLoopEvent
the other option is to use cached_property:
class ProcessInterface:
def __init__(self, scheduler=None, name=None):
self.address = getattr(self, "address", None)
self.external_address = None
self.status = Status.created
@functools.cached_property
def lock(self):
return asyncio.Lock()
@functools.cached_property
def _event_finished(self):
return asyncio.Event()