distributed
distributed copied to clipboard
`RuntimeError: Not enough arguments provided: missing keys` in `dask.persist` with mix of `Future` and `Delayed`
Describe the issue:
I get a RuntimeError with some dask-ml code that worked with dask / distributed 2024.10.0 and earlier. With 2024.11.0 and newer, it fails:
Minimal Complete Verifiable Example:
import dask
import dask.distributed
@dask.delayed
def f(x):
return x
def main():
with dask.distributed.Client() as client:
print(dask.persist(client.submit(lambda x: x, 1), f(0)))
if __name__ == "__main__":
main()
On the older versions of dask, that prints out
(<Future: pending, key: lambda-80efb60e0b886931a35d757d5c4b3ac2>, Delayed('f-8a35b8af-26b5-427e-86c8-98b6d8b3f49f'))
Newer versions fail with
Traceback (most recent call last):
File "/Users/tom/gh/dask/dask-ml/bug.py", line 15, in <module>
main()
File "/Users/tom/gh/dask/dask-ml/bug.py", line 12, in main
print(dask.persist(client.submit(lambda x: x, 1), f(0)))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/base.py", line 988, in persist
return repack(results)
^^^^^^^^^^^^^^^
File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/base.py", line 535, in repack
return simple_get(dsk, out)
^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/core.py", line 102, in get
result = execute_graph(dsk2, cache, keys=set(flatten([out])))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/_task_spec.py", line 786, in execute_graph
cache[key] = node(cache)
^^^^^^^^^^^
File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/_task_spec.py", line 670, in __call__
self._verify_values(values)
File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/_task_spec.py", line 428, in _verify_values
raise RuntimeError(f"Not enough arguments provided: missing keys {missing}")
RuntimeError: Not enough arguments provided: missing keys {'lambda-80efb60e0b886931a35d757d5c4b3ac2'}
Anything else we need to know?:
It seems to be the combination of Futures and Delayed objects that causes issues. Both of these seem to work fine.
print(dask.persist(client.submit(lambda x: x, 1), client.submit(lambda x: x, 2)) # OK
print(dask.persist(f(0), f(1))) # OK
I'll look for fixes / workarounds once I get up to speed on what's going on.
Environment:
- Dask version:
- Python version:
- Operating System:
- Install method (conda, pip, source):
@TomAugspurger: Thanks for reporting this issue. This appears to be related to the latest migration toward the TaskSpec representation. I'll look into what's happening.
xref https://github.com/dask/dask/issues/9969
@TomAugspurger: This is indeed related to dask/dask#9969. In https://github.com/dask/distributed/issues/8797, the Future class became a subclass of TaskRef which causes this problem.
Taking a step back, I'm curious, is it intended that you call persist on both persistable Dask collections and ordinary objects?
The persist happens at https://github.com/dask/dask-ml/blob/ed8a2b7d25d59f1c8f7eeff40a53adc466fa2ce4/dask_ml/model_selection/_incremental.py#L318. I'll spend some time tomorrow getting up to speed with it, but at a glance something strange-ish seems to be going on. The first time through the loop we (successfully) persist a dict[int, Delayed]. The second time through, some of those values have become dict[int, Future]. That seems strange, so maybe this doesn't need to be supported.
Thanks for the update. I've opened up another issue to discuss whether Dask should transparently handle non-persistable objects or raise instead: https://github.com/dask/dask/issues/11566. Feel free to weigh in on which behavior you would deem more helpful.