distributed icon indicating copy to clipboard operation
distributed copied to clipboard

debugging adaptive deployment on LSF

Open d-v-b opened this issue 5 years ago • 9 comments

(Not sure if this should go here or dask-jobqueue)

I'm trying to debug problems with an adaptive deployment on an LSF cluster.

I have ~.5 TB of data saved on disk as a .zarr file that I use to generate a dask array with the following properties:

shape=(11087, 2500, 10000)
chunksize=(40, 1024, 1024)
dtype='uint16'

Working with this dataset using a statically-scaled cluster works fine, but the adaptive cluster fails for the same operations. I'm trying to debug these problems using a dummy dataset (i.e., da.zeros() with the same shape, chunks, and datatype of my original data). With this dummy data and an LSFCluster scaled to 10 workers, the following code runs without problems:

client_regular = Client(get_jobqueue_cluster())
client_regular.cluster.scale(10)
result = client_regular.compute([dummy.min(), dummy.max()], sync=True)

(get_jobqueue_cluster is a deployment-specific function for creating a LSFCluster: source) but the same computation on an adaptive cluster fails:

client_adaptive = Client(get_jobqueue_cluster())
client_adaptive.cluster.adapt(minimum_jobs=10)
result = client_adaptive.compute([dummy.min(), dummy.max()], sync=True)

I see a lot of errors, e.g. lots of these:

distributed.core - ERROR - 'tcp://10.36.60.31:46145'
Traceback (most recent call last):
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream
    handler(**merge(extra, msg))
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.36.60.31:46145'
distributed.utils - ERROR - 'tcp://10.36.60.31:46145'
Traceback (most recent call last):
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
    yield
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 1529, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2446, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream
    handler(**merge(extra, msg))
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.36.60.31:46145'

and lots of these AssertionErrors from L2923 of scheduler.py:

distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
    yield
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers
    delete=False,
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate
    assert count > 0
AssertionError

None of these errors is fatal, so despite a constant stream of errors the computation via the adaptive cluster blocks indefinitely. Even if I use a keyboard interrupt, the adaptive cluster keeps running futile tasks in the background, which isn't great. The worker logs haven't revealed anything too illustrative, just lots of errors due to workers not finding data (presumably because some worker upstream has been killed).

If I reduce the size of the data (e.g., 100 GB), I can use the adaptive cluster without issues. But at some fraction of the full data size I start getting these errors and the whole thing breaks.

So, are there any ideas for debugging / fixing this?

d-v-b avatar Dec 11 '19 00:12 d-v-b

Can I ask you to try this again with distributed master? There has been a slight change recently which may affect this.

On Tue, Dec 10, 2019 at 4:39 PM Davis Bennett [email protected] wrote:

(Not sure if this should go here or dask-jobqueue)

I'm trying to debug problems with an adaptive deployment on an LSF cluster.

I have ~.5 TB of data saved on disk as a .zarr file that I use to generate a dask array with the following properties:

shape=(11087, 2500, 10000) chunksize=(40, 1024, 1024) dtype='uint16'

Working with this dataset using a statically-scaled cluster works fine, but the adaptive cluster fails for the same operations. I'm trying to debug these problems using a dummy dataset (i.e., da.zeros() with the same shape, chunks, and datatype of my original data). With this dummy data and an LSFCluster scaled to 10 workers, the following code runs without problems:

client_regular = Client(get_jobqueue_cluster()) client_regular.cluster.scale(10) result = client_regular.compute([dummy.min(), dummy.max()], sync=True)

(get_jobqueue_cluster is a deployment-specific function for creating a LSFCluster: source https://github.com/janelia-cosem/fst/blob/master/fst/distributed.py#L23) but the same computation on an adaptive cluster fails:

client_adaptive = Client(get_jobqueue_cluster()) client_adaptive.cluster.adapt(minimum_jobs=10) result = client_adaptive.compute([dummy.min(), dummy.max()], sync=True)

I see a lot of errors, e.g. lots of these:

distributed.core - ERROR - 'tcp://10.36.60.31:46145' Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream handler(**merge(extra, msg)) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data ws = self.workers[worker] KeyError: 'tcp://10.36.60.31:46145' distributed.utils - ERROR - 'tcp://10.36.60.31:46145' Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors yield File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 1529, in add_worker await self.handle_worker(comm=comm, worker=address) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2446, in handle_worker await self.handle_stream(comm=comm, extra={"worker": worker}) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream handler(**merge(extra, msg)) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data ws = self.workers[worker] KeyError: 'tcp://10.36.60.31:46145'

and lots of these AssertionErrors from L2923 of scheduler.py:

distributed.utils - ERROR - Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors yield File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers delete=False, File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate assert count > 0 AssertionError

None of these errors is fatal, so despite a constant stream of errors the computation via the adaptive cluster blocks indefinitely. Even if I use a keyboard interrupt, the adaptive cluster keeps running futile tasks in the background, which isn't great. The worker logs haven't revealed anything too illustrative, just lots of errors due to workers not finding data (presumably because some worker upstream has been killed).

If I reduce the size of the data (e.g., 100 GB), I can use the adaptive cluster without issues. But at some fraction of the full data size I start getting these errors and the whole thing breaks.

So, are there any ideas for debugging / fixing this?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3312?email_source=notifications&email_token=AACKZTGUWVG35K3JXKE3KJLQYAZETA5CNFSM4JZG4EF2YY3PNVWWK3TUL52HS4DFUVEXG43VMWVGG33NNVSW45C7NFSM4H7T26DA, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTDFW67SV3MSNSZYXOTQYAZETANCNFSM4JZG4EFQ .

mrocklin avatar Dec 11 '19 00:12 mrocklin

Thanks for the suggestion; I tried this, and it worked! There were still some problems -- hundreds of those same AssertionErrors from before; additionally the adaptive cluster dashboard is unresponsive for the first 1-2 minutes. Also, the task allocation is super lopsided, and a lot of workers didn't have a lot to do:

image

Errors and inefficiency aside, maybe taking the min and max is "challenge mode" for the adaptive scheduler, since the task graph is a giant tree with a lot of convergence. I'll try something a little less tree-like and see how well that works.

What was the enabling change in distributed that you suspect improved things?

d-v-b avatar Dec 11 '19 01:12 d-v-b

Are the assertions still the same as before, coming from replicate, or are they coming from somewhere else now?

mrocklin avatar Dec 11 '19 02:12 mrocklin

You're spending a lot of time reading and writing from disk, which is in line with your concern that all of the data at first landed in the same place. For adaptive workloads you might want to consider turnng off spill-to-disk, and instead just pause workers when they start running out of memory. Perhaps with a configuration like this:

distributed:
  worker:
    memory:
      target: false  # don't spill to disk
      spill: false  # don't spill to disk
      pause: 0.70  # pause execution at 70% memory use
      terminate: 0.95  # restart the worker at 95% use

For some reason load balancing isn't kicking in. My guess is that this is because dependencies that were already computed are trapped on disk. I'm not sure though. I would be curious if that change helps.

mrocklin avatar Dec 11 '19 02:12 mrocklin

Hrm, it's odd that there is read-write behavior even when the reported memory of your workers is low. My guess is that some of your objects are mis-reporting their size as too large, resulting in Dask being unwilling to move them around, even in the context of a need to load balance.

mrocklin avatar Dec 11 '19 02:12 mrocklin

Are the assertions still the same as before, coming from replicate, or are they coming from somewhere else now?

Yeah, they are still from the replicate

Hrm, it's odd that there is read-write behavior even when the reported memory of your workers is low. My guess is that some of your objects are mis-reporting their size as too large, resulting in Dask being unwilling to move them around, even in the context of a need to load balance.

The data is just da.zeros() with a type conversion, so the memory footprint should be pretty predictable...

I'll try your suggestion for the worker config tomorrow and update with what I see.

d-v-b avatar Dec 11 '19 02:12 d-v-b

@mrocklin adaptive execution of this specific workload stabilized after I used the config you supplied. However, I can easily get the adaptive cluster to hang again by increasing the number of chunks of my data. (Like the previous workload, the static cluster has no trouble with this).

The errors are the same as before -- a lot of variations of this:

distributed.core - ERROR - 'tcp://10.36.60.32:43754'
Traceback (most recent call last):
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream
    handler(**merge(extra, msg))
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2362, in handle_release_data
    ws = self.workers[worker]
KeyError: 'tcp://10.36.60.32:43754'

and this:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b5b67db1b38>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt
    await self.scale_down(**recommendations)
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3184, in retire_workers
    lock=False,
  File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2938, in replicate
    assert count > 0
AssertionError

The dashboard is locked in this state: image

One answer might be "avoid chunking your data that way", but unfortunately this chunking is very natural for certain operations on the data, and it's strange that the static cluster manages just fine here.

d-v-b avatar Dec 11 '19 16:12 d-v-b

If you're running into exceptions then it's definitely a bug on our side. Thank you for taking the time to run experiments and provide additional information. I may have some time to look at this tomorrow. If you're able to provide a minimal reproducible example that I'm able to run locally that would greatly increase the chances that I'll be able to help (though I understand that this may be a challenge in your case).

On Wed, Dec 11, 2019 at 8:22 AM Davis Bennett [email protected] wrote:

@mrocklin https://github.com/mrocklin adaptive execution of this specific workload stabilized after I used the config you supplied. However, I can easily get the adaptive cluster to hang again by increasing the number of chunks of my data. (Like the previous workload, the static cluster has no trouble with this).

The errors are the same as before -- a lot of variations of this:

distributed.core - ERROR - 'tcp://10.36.60.32:43754' Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream handler(**merge(extra, msg)) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2362, in handle_release_data ws = self.workers[worker] KeyError: 'tcp://10.36.60.32:43754'

and this:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b5b67db1b38>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>) Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback ret = callback() File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result future.result() File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt await self.scale_down(**recommendations) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down names=workers, remove=True, close_workers=True File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc result = await send_recv(comm=comm, op=key, **kwargs) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv raise exc.with_traceback(tb) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm result = await result File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3184, in retire_workers lock=False, File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2938, in replicate assert count > 0 AssertionError

The dashboard is locked in this state: [image: image] https://user-images.githubusercontent.com/3805136/70639018-d7420b80-1c07-11ea-9afe-982033979d5c.png

One answer might be "avoid chunking your data that way", but unfortunately this chunking is very natural for certain operations on the data, and it's strange that the static cluster manages just fine here.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3312?email_source=notifications&email_token=AACKZTBM3EQMMNQ3HYXXCJ3QYEHTLA5CNFSM4JZG4EF2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEGTW7YI#issuecomment-564621281, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGUK62CF5SF3FDGER3QYEHTLANCNFSM4JZG4EFQ .

mrocklin avatar Dec 11 '19 16:12 mrocklin

Thanks for helping out with this -- I made a repo with a notebook that contains my tests. Let me know if there's anything else you need to know about my deployment. Hopefully you can get some insight into my issues from a local deployment; if not, I'm happy to test more stuff on the Janelia cluster (and maybe @jakirkham could also help, if he still has cluster access)

d-v-b avatar Dec 11 '19 18:12 d-v-b