adaptive icon indicating copy to clipboard operation
adaptive copied to clipboard

runner does not work with dask adaptive scaling client

Open Kostusas opened this issue 2 years ago • 7 comments

Minimal code to reproduce the error on local Jupyter notebook:

import distributed
import adaptive
adaptive.notebook_extension()

cluster = distributed.LocalCluster()
cluster.adapt(minimum=0, maximum=5) # works with manual scaling cluster.scale(5)

client = distributed.Client(cluster)

learner = adaptive.Learner1D(lambda x: x, bounds=(-1, 1))
runner = adaptive.Runner(learner, executor=client, goal=lambda l: l.loss() < 0.01)
runner.live_info()

cluster.close()

returns error:

Task exception was never retrieved
future: <Task finished name='Task-327' coro=<live_info.<locals>.update() done, defined at /opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py:217> exception=AssertionError()>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py", line 226, in update
    status.value = _info_html(runner)
  File "/opt/conda/lib/python3.9/site-packages/adaptive/notebook_integration.py", line 258, in _info_html
    ("elapsed time", datetime.timedelta(seconds=runner.elapsed_time())),
  File "/opt/conda/lib/python3.9/site-packages/adaptive/runner.py", line 658, in elapsed_time
    assert self.task.cancelled()
AssertionError

The same thing happens when running on a cluster with manual scaling without giving enough time to connect to the workers. It seems adaptive does not see any workers and terminates the process.

Kostusas avatar Sep 21 '21 13:09 Kostusas

I think we should change the heuristic for determining how many workers we have available by checking the client configuration and scaling strategies.

akhmerov avatar Sep 21 '21 13:09 akhmerov

You are running into this error: https://github.com/python-adaptive/adaptive/blob/f28bab073fed8723b0569fcfb6886fccc2133ecd/adaptive/runner.py#L403-L404

because you start with 0 cores.

If you change your argument from minimum=0 to minimum=1, Adaptive does detect the scaling correctly.

Would this be good enough for you?

basnijholt avatar Sep 21 '21 14:09 basnijholt

This seems to be a workaround, but I think actually detecting the configuration would be more reliable. Unfortunately I can't quite find the correct API in distributed.

akhmerov avatar Sep 25 '21 12:09 akhmerov

I've asked whether there's a better way on stack overflow (AFAIR that's the preferred channel for dask): https://stackoverflow.com/q/69326568/2217463

akhmerov avatar Sep 25 '21 13:09 akhmerov

Why would the maximal number of cores matter instead of the currently available cores?

basnijholt avatar Sep 27 '21 07:09 basnijholt

It's a chicken and egg problem otherwise: the adaptive scaling of dask won't request new workers if there are no tasks in the queue.

akhmerov avatar Sep 27 '21 07:09 akhmerov

Hmm, then we would already query some points that will not be calculated yet.

Why not change the following

https://github.com/python-adaptive/adaptive/blob/a81be7aa8a703c44ab348710f339686f4eb57641/adaptive/runner.py#L832-L833

to

elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor): 
    ncores = sum(n for n in ex._client.ncores().values()) 
    return max(1, ncores)

basnijholt avatar Sep 27 '21 12:09 basnijholt