distributed
distributed copied to clipboard
async task classes are not fully implemented for Python 3.11
What happened:
It appears that in Python 3.11, more parts of async task-like classes must be implemented, but they are not in distributed
. This seems to affect at least Nanny
, Worker
, ProcessInterface
, MultiWorker
, and Future
.
Nanny failure
_______________ test_client_constructor_with_temporary_security ________________
@gen_test()
async def test_client_constructor_with_temporary_security():
xfail_ssl_issue5601()
pytest.importorskip("cryptography")
> async with Client(
security=True, silence_logs=False, dashboard_address=":0", asynchronous=True
) as c:
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/tests/test_local.py:318:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/client.py:1347: in __aenter__
await self
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/client.py:1164: in _start
self.cluster = await LocalCluster(
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:389: in _
await self._correct_state()
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:355: in _correct_state_internal
await asyncio.wait(workers)
/usr/lib64/python3.11/asyncio/tasks.py:427: in wait
return await _wait(fs, timeout, return_when, loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fs = {<Nanny: None, threads: 1>, <Nanny: None, threads: 1>, <Nanny: None, threads: 1>, <Nanny: None, threads: 1>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=False closed=True debug=False>
async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
The fs argument must be a collection of Futures.
"""
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
> f.add_done_callback(_on_completion)
E AttributeError: 'Nanny' object has no attribute 'add_done_callback'
/usr/lib64/python3.11/asyncio/tasks.py:531: AttributeError
Worker failure
__________________________________ test_procs __________________________________
self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
workers = {0: {'cls': <class 'distributed.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': No...ted.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': None, 'interface': None, ...}}}
scheduler = {'cls': <class 'distributed.scheduler.Scheduler'>, 'options': {'blocked_handlers': None, 'dashboard': True, 'dashboard_address': ':0', 'host': None, ...}}
worker = {'cls': <class 'distributed.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': None, 'interface': None, ...}}
asynchronous = False
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef5d2350>
security = Security(require_encryption=False, tls_min_version=771)
silence_logs = False, name = None, shutdown_on_close = True
scheduler_sync_interval = 1
def __init__(
self,
workers=None,
scheduler=None,
worker=None,
asynchronous=False,
loop=None,
security=None,
silence_logs=False,
name=None,
shutdown_on_close=True,
scheduler_sync_interval=1,
):
self._created = weakref.WeakSet()
self.scheduler_spec = copy.copy(scheduler)
self.worker_spec = copy.copy(workers) or {}
self.new_spec = copy.copy(worker)
self.scheduler = None
self.workers = {}
self._i = 0
self.security = security or Security()
self._futures = set()
if silence_logs:
self._old_logging_level = silence_logging(level=silence_logs)
self._old_bokeh_logging_level = silence_logging(
level=silence_logs, root="bokeh"
)
self._instances.add(self)
self._correct_state_waiting = None
self._name = name or type(self).__name__
self.shutdown_on_close = shutdown_on_close
super().__init__(
asynchronous=asynchronous,
loop=loop,
name=name,
scheduler_sync_interval=scheduler_sync_interval,
)
if not self.asynchronous:
self._loop_runner.start()
self.sync(self._start)
try:
> self.sync(self._correct_state)
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:266:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
func = <bound method SpecCluster._correct_state of LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)>
asynchronous = False, callback_timeout = None, args = (), kwargs = {}
def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs):
"""Call `func` with `args` synchronously or asynchronously depending on
the calling context"""
callback_timeout = _parse_timedelta(callback_timeout)
if asynchronous is None:
asynchronous = self.asynchronous
if asynchronous:
future = func(*args, **kwargs)
if callback_timeout is not None:
future = asyncio.wait_for(future, callback_timeout)
return future
else:
> return sync(
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
)
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:338:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef5d2350>
func = <bound method SpecCluster._correct_state of LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)>
callback_timeout = None, args = (), kwargs = {}
f = <function sync.<locals>.f at 0x7fb8f0953ec0>
wait = <function sync.<locals>.wait at 0x7fb8f09520c0>
typ = <class 'AttributeError'>
exc = AttributeError("'Worker' object has no attribute 'add_done_callback'")
tb = <traceback object at 0x7fb8fa548700>
def sync(loop, func, *args, callback_timeout=None, **kwargs):
"""
Run coroutine in loop running in separate thread.
"""
callback_timeout = _parse_timedelta(callback_timeout, "s")
if loop.asyncio_loop.is_closed():
raise RuntimeError("IOLoop is closed")
e = threading.Event()
main_tid = threading.get_ident()
result = error = future = None # set up non-locals
@gen.coroutine
def f():
nonlocal result, error, future
try:
if main_tid == threading.get_ident():
raise RuntimeError("sync() called from thread of running loop")
yield gen.moment
future = func(*args, **kwargs)
if callback_timeout is not None:
future = asyncio.wait_for(future, callback_timeout)
future = asyncio.ensure_future(future)
result = yield future
except Exception:
error = sys.exc_info()
finally:
e.set()
def cancel():
if future is not None:
future.cancel()
def wait(timeout):
try:
return e.wait(timeout)
except KeyboardInterrupt:
loop.add_callback(cancel)
raise
loop.add_callback(f)
if callback_timeout is not None:
if not wait(callback_timeout):
raise TimeoutError(f"timed out after {callback_timeout} s.")
else:
while not e.is_set():
wait(10)
if error:
typ, exc, tb = error
> raise exc.with_traceback(tb)
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:405:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@gen.coroutine
def f():
nonlocal result, error, future
try:
if main_tid == threading.get_ident():
raise RuntimeError("sync() called from thread of running loop")
yield gen.moment
future = func(*args, **kwargs)
if callback_timeout is not None:
future = asyncio.wait_for(future, callback_timeout)
future = asyncio.ensure_future(future)
> result = yield future
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:378:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.gen.Runner object at 0x7fb8fa549b10>
def run(self) -> None:
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if future is None:
raise Exception("No pending future")
if not future.done():
return
self.future = None
try:
exc_info = None
try:
> value = future.result()
/usr/lib64/python3.11/site-packages/tornado/gen.py:769:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
async def _correct_state_internal(self):
async with self._lock:
self._correct_state_waiting = None
to_close = set(self.workers) - set(self.worker_spec)
if to_close:
if self.scheduler.status == Status.running:
await self.scheduler_comm.retire_workers(workers=list(to_close))
tasks = [
asyncio.create_task(self.workers[w].close())
for w in to_close
if w in self.workers
]
await asyncio.gather(*tasks)
for name in to_close:
if name in self.workers:
del self.workers[name]
to_open = set(self.worker_spec) - set(self.workers)
workers = []
for name in to_open:
d = self.worker_spec[name]
cls, opts = d["cls"], d.get("options", {})
if "name" not in opts:
opts = opts.copy()
opts["name"] = name
if isinstance(cls, str):
cls = import_term(cls)
worker = cls(
getattr(self.scheduler, "contact_address", None)
or self.scheduler.address,
**opts,
)
self._created.add(worker)
workers.append(worker)
if workers:
> await asyncio.wait(workers)
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:355:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fs = {<Worker 'not-running', name: 1, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>, <Worker 'not-running', name: 0, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>}
async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures or Tasks given by fs to complete.
The fs iterable must not be empty.
Coroutines will be wrapped in Tasks.
Returns two sets of Future: (done, pending).
Usage:
done, pending = await asyncio.wait(fs)
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of Tasks/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
fs = set(fs)
if any(coroutines.iscoroutine(f) for f in fs):
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
loop = events.get_running_loop()
> return await _wait(fs, timeout, return_when, loop)
/usr/lib64/python3.11/asyncio/tasks.py:427:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fs = {<Worker 'not-running', name: 1, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>, <Worker 'not-running', name: 0, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=True closed=False debug=False>
async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
The fs argument must be a collection of Futures.
"""
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
> f.add_done_callback(_on_completion)
E AttributeError: 'Worker' object has no attribute 'add_done_callback'
/usr/lib64/python3.11/asyncio/tasks.py:531: AttributeError
ProcessInterface failure
__________________________ test_ProcessInterfaceValid __________________________
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8f036b650>
callback = functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0...ibuted/deploy/spec.py:319> exception=AttributeError("'ProcessInterface' object has no attribute 'add_done_callback'")>)
def _run_callback(self, callback: Callable[[], Any]) -> None:
"""Runs a callback with error handling.
.. versionchanged:: 6.0
CancelledErrors are no longer logged.
"""
try:
> ret = callback()
/usr/lib64/python3.11/site-packages/tornado/ioloop.py:740:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8f036b650>
future = <Task finished name='Task-143678' coro=<SpecCluster._correct_state_internal() done, defined at /builddir/build/BUILDRO...ributed/deploy/spec.py:319> exception=AttributeError("'ProcessInterface' object has no attribute 'add_done_callback'")>
def _discard_future_result(self, future: Future) -> None:
"""Avoid unhandled-exception warnings from spawned coroutines."""
> future.result()
E AttributeError: 'ProcessInterface' object has no attribute 'add_done_callback'
/usr/lib64/python3.11/site-packages/tornado/ioloop.py:764: AttributeError
MultiWorker failure
_______________________________ test_MultiWorker _______________________________
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef7ead50>
callback = functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0...distributed/deploy/spec.py:319> exception=AttributeError("'MultiWorker' object has no attribute 'add_done_callback'")>)
def _run_callback(self, callback: Callable[[], Any]) -> None:
"""Runs a callback with error handling.
.. versionchanged:: 6.0
CancelledErrors are no longer logged.
"""
try:
> ret = callback()
/usr/lib64/python3.11/site-packages/tornado/ioloop.py:740:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef7ead50>
future = <Task finished name='Task-143730' coro=<SpecCluster._correct_state_internal() done, defined at /builddir/build/BUILDRO.../distributed/deploy/spec.py:319> exception=AttributeError("'MultiWorker' object has no attribute 'add_done_callback'")>
def _discard_future_result(self, future: Future) -> None:
"""Avoid unhandled-exception warnings from spawned coroutines."""
> future.result()
E AttributeError: 'MultiWorker' object has no attribute 'add_done_callback'
Future failure
___________________________ test_task_unique_groups ____________________________
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45313', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34269', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37035', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_task_unique_groups(c, s, a, b):
"""This test ensure that task groups remain unique when using submit"""
x = c.submit(sum, [1, 2])
y = c.submit(len, [1, 2])
z = c.submit(sum, [3, 4])
> await asyncio.wait([x, y, z])
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/tests/test_scheduler.py:2206:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/lib64/python3.11/asyncio/tasks.py:427: in wait
return await _wait(fs, timeout, return_when, loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fs = {<Future: cancelled, type: int, key: len-a6d0feba889a132085fa70f1b616feeb>, <Future: cancelled, type: int, key: sum-1d10c8fc1de9b59dc44708aaf25351e0>, <Future: cancelled, type: int, key: sum-fcf9017adfd73674bb128a7ddc1ad246>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=False closed=True debug=False>
async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
The fs argument must be a collection of Futures.
"""
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
f.add_done_callback(_on_completion)
try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()
for f in fs:
> f.remove_done_callback(_on_completion)
E AttributeError: 'Future' object has no attribute 'remove_done_callback'
/usr/lib64/python3.11/asyncio/tasks.py:539: AttributeError
What you expected to happen: Tests all pass.
Environment:
- Dask version: 2022.7.1
- Python version: 3.11.0.b4
- Operating System: Fedora Rawhide
- Install method (conda, pip, source): source
https://github.com/python/cpython/commit/903f0a02c16240dc769a08c30e8d072a4fb09154
Looks like Coroutines were deprecated but not Awaitables
on python3.10 -W error
this passes:
class ExampleAwaitable:
def __await__(self):
async def _():
return await asyncio.sleep(0)
return _().__await__()
import asyncio
print(f"{asyncio.iscoroutine(ExampleAwaitable())=}")
async def amain():
await asyncio.wait([ExampleAwaitable()])
print("waited!")
asyncio.run(amain())
and in 3.11 this fails:
asyncio.iscoroutine(ExampleAwaitable())=False
Traceback (most recent call last):
File "/home/graingert/projects/foo.py", line 17, in <module>
asyncio.run(amain())
File "/usr/lib/python3.11/asyncio/runners.py", line 187, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/runners.py", line 120, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/home/graingert/projects/foo.py", line 14, in amain
await asyncio.wait([ExampleAwaitable()])
File "/usr/lib/python3.11/asyncio/tasks.py", line 427, in wait
return await _wait(fs, timeout, return_when, loop)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/tasks.py", line 531, in _wait
f.add_done_callback(_on_completion)
^^^^^^^^^^^^^^^^^^^
AttributeError: 'ExampleAwaitable' object has no attribute 'add_done_callback'
I think we want to make most of these objects not Awaitables anyway eg. Nanny, Worker, ProcessInterface, MultiWorker where they can't really work correctly without async with
Future
is tricky as we kind of want to support asyncio.wait([f: Future])
but we really don't want it to end up in an asyncio task._fut_waiter
without being wrapped in an asyncio.Task
Maybe we could support await f.wait_for(delay)
and await distrubuted.wait([f])
only instead?
Closing as we've added Python 3.11 support in https://github.com/dask/distributed/pull/7249. @graingert @QuLogic let me know if this is still an issue and we can re-open as needed