distributed icon indicating copy to clipboard operation
distributed copied to clipboard

`RuntimeError: Not enough arguments provided: missing keys` in `dask.persist` with mix of `Future` and `Delayed`

Open TomAugspurger opened this issue 1 year ago • 5 comments

Describe the issue:

I get a RuntimeError with some dask-ml code that worked with dask / distributed 2024.10.0 and earlier. With 2024.11.0 and newer, it fails:

Minimal Complete Verifiable Example:

import dask
import dask.distributed


@dask.delayed
def f(x):
    return x


def main():
    with dask.distributed.Client() as client:
        print(dask.persist(client.submit(lambda x: x, 1), f(0)))

if __name__ == "__main__":
    main()

On the older versions of dask, that prints out

(<Future: pending, key: lambda-80efb60e0b886931a35d757d5c4b3ac2>, Delayed('f-8a35b8af-26b5-427e-86c8-98b6d8b3f49f'))

Newer versions fail with

Traceback (most recent call last):
  File "/Users/tom/gh/dask/dask-ml/bug.py", line 15, in <module>
    main()
  File "/Users/tom/gh/dask/dask-ml/bug.py", line 12, in main
    print(dask.persist(client.submit(lambda x: x, 1), f(0)))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/base.py", line 988, in persist
    return repack(results)
           ^^^^^^^^^^^^^^^
  File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/base.py", line 535, in repack
    return simple_get(dsk, out)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/core.py", line 102, in get
    result = execute_graph(dsk2, cache, keys=set(flatten([out])))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/_task_spec.py", line 786, in execute_graph
    cache[key] = node(cache)
                 ^^^^^^^^^^^
  File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/_task_spec.py", line 670, in __call__
    self._verify_values(values)
  File "/Users/tom/gh/dask/dask-ml/.direnv/python-3.11/lib/python3.11/site-packages/dask/_task_spec.py", line 428, in _verify_values
    raise RuntimeError(f"Not enough arguments provided: missing keys {missing}")
RuntimeError: Not enough arguments provided: missing keys {'lambda-80efb60e0b886931a35d757d5c4b3ac2'}

Anything else we need to know?:

It seems to be the combination of Futures and Delayed objects that causes issues. Both of these seem to work fine.

       print(dask.persist(client.submit(lambda x: x, 1), client.submit(lambda x: x, 2))  # OK
       print(dask.persist(f(0), f(1)))  # OK

I'll look for fixes / workarounds once I get up to speed on what's going on.

Environment:

  • Dask version:
  • Python version:
  • Operating System:
  • Install method (conda, pip, source):

TomAugspurger avatar Nov 25 '24 15:11 TomAugspurger

@TomAugspurger: Thanks for reporting this issue. This appears to be related to the latest migration toward the TaskSpec representation. I'll look into what's happening.

hendrikmakait avatar Nov 25 '24 15:11 hendrikmakait

xref https://github.com/dask/dask/issues/9969

fjetter avatar Nov 25 '24 15:11 fjetter

@TomAugspurger: This is indeed related to dask/dask#9969. In https://github.com/dask/distributed/issues/8797, the Future class became a subclass of TaskRef which causes this problem.

Taking a step back, I'm curious, is it intended that you call persist on both persistable Dask collections and ordinary objects?

hendrikmakait avatar Nov 25 '24 17:11 hendrikmakait

The persist happens at https://github.com/dask/dask-ml/blob/ed8a2b7d25d59f1c8f7eeff40a53adc466fa2ce4/dask_ml/model_selection/_incremental.py#L318. I'll spend some time tomorrow getting up to speed with it, but at a glance something strange-ish seems to be going on. The first time through the loop we (successfully) persist a dict[int, Delayed]. The second time through, some of those values have become dict[int, Future]. That seems strange, so maybe this doesn't need to be supported.

TomAugspurger avatar Nov 26 '24 00:11 TomAugspurger

Thanks for the update. I've opened up another issue to discuss whether Dask should transparently handle non-persistable objects or raise instead: https://github.com/dask/dask/issues/11566. Feel free to weigh in on which behavior you would deem more helpful.

hendrikmakait avatar Nov 26 '24 07:11 hendrikmakait