distributed
distributed copied to clipboard
debugging adaptive deployment on LSF
(Not sure if this should go here or dask-jobqueue)
I'm trying to debug problems with an adaptive deployment on an LSF cluster.
I have ~.5 TB of data saved on disk as a .zarr file that I use to generate a dask array with the following properties:
shape=(11087, 2500, 10000)
chunksize=(40, 1024, 1024)
dtype='uint16'
Working with this dataset using a statically-scaled cluster works fine, but the adaptive cluster fails for the same operations. I'm trying to debug these problems using a dummy dataset (i.e., da.zeros()
with the same shape, chunks, and datatype of my original data). With this dummy data and an LSFCluster scaled to 10 workers, the following code runs without problems:
client_regular = Client(get_jobqueue_cluster())
client_regular.cluster.scale(10)
result = client_regular.compute([dummy.min(), dummy.max()], sync=True)
(get_jobqueue_cluster
is a deployment-specific function for creating a LSFCluster
: source)
but the same computation on an adaptive
cluster fails:
client_adaptive = Client(get_jobqueue_cluster())
client_adaptive.cluster.adapt(minimum_jobs=10)
result = client_adaptive.compute([dummy.min(), dummy.max()], sync=True)
I see a lot of errors, e.g. lots of these:
distributed.core - ERROR - 'tcp://10.36.60.31:46145'
Traceback (most recent call last):
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream
handler(**merge(extra, msg))
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.36.60.31:46145'
distributed.utils - ERROR - 'tcp://10.36.60.31:46145'
Traceback (most recent call last):
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
yield
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 1529, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2446, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream
handler(**merge(extra, msg))
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.36.60.31:46145'
and lots of these AssertionError
s from L2923 of scheduler.py
:
distributed.utils - ERROR -
Traceback (most recent call last):
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
yield
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers
delete=False,
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate
assert count > 0
AssertionError
None of these errors is fatal, so despite a constant stream of errors the computation via the adaptive cluster blocks indefinitely. Even if I use a keyboard interrupt, the adaptive cluster keeps running futile tasks in the background, which isn't great. The worker logs haven't revealed anything too illustrative, just lots of errors due to workers not finding data (presumably because some worker upstream has been killed).
If I reduce the size of the data (e.g., 100 GB), I can use the adaptive cluster without issues. But at some fraction of the full data size I start getting these errors and the whole thing breaks.
So, are there any ideas for debugging / fixing this?
Can I ask you to try this again with distributed master? There has been a slight change recently which may affect this.
On Tue, Dec 10, 2019 at 4:39 PM Davis Bennett [email protected] wrote:
(Not sure if this should go here or dask-jobqueue)
I'm trying to debug problems with an adaptive deployment on an LSF cluster.
I have ~.5 TB of data saved on disk as a .zarr file that I use to generate a dask array with the following properties:
shape=(11087, 2500, 10000) chunksize=(40, 1024, 1024) dtype='uint16'
Working with this dataset using a statically-scaled cluster works fine, but the adaptive cluster fails for the same operations. I'm trying to debug these problems using a dummy dataset (i.e., da.zeros() with the same shape, chunks, and datatype of my original data). With this dummy data and an LSFCluster scaled to 10 workers, the following code runs without problems:
client_regular = Client(get_jobqueue_cluster()) client_regular.cluster.scale(10) result = client_regular.compute([dummy.min(), dummy.max()], sync=True)
(get_jobqueue_cluster is a deployment-specific function for creating a LSFCluster: source https://github.com/janelia-cosem/fst/blob/master/fst/distributed.py#L23) but the same computation on an adaptive cluster fails:
client_adaptive = Client(get_jobqueue_cluster()) client_adaptive.cluster.adapt(minimum_jobs=10) result = client_adaptive.compute([dummy.min(), dummy.max()], sync=True)
I see a lot of errors, e.g. lots of these:
distributed.core - ERROR - 'tcp://10.36.60.31:46145' Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream handler(**merge(extra, msg)) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data ws = self.workers[worker] KeyError: 'tcp://10.36.60.31:46145' distributed.utils - ERROR - 'tcp://10.36.60.31:46145' Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors yield File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 1529, in add_worker await self.handle_worker(comm=comm, worker=address) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2446, in handle_worker await self.handle_stream(comm=comm, extra={"worker": worker}) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream handler(**merge(extra, msg)) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2359, in handle_release_data ws = self.workers[worker] KeyError: 'tcp://10.36.60.31:46145'
and lots of these AssertionErrors from L2923 of scheduler.py:
distributed.utils - ERROR - Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors yield File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers delete=False, File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate assert count > 0 AssertionError
None of these errors is fatal, so despite a constant stream of errors the computation via the adaptive cluster blocks indefinitely. Even if I use a keyboard interrupt, the adaptive cluster keeps running futile tasks in the background, which isn't great. The worker logs haven't revealed anything too illustrative, just lots of errors due to workers not finding data (presumably because some worker upstream has been killed).
If I reduce the size of the data (e.g., 100 GB), I can use the adaptive cluster without issues. But at some fraction of the full data size I start getting these errors and the whole thing breaks.
So, are there any ideas for debugging / fixing this?
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3312?email_source=notifications&email_token=AACKZTGUWVG35K3JXKE3KJLQYAZETA5CNFSM4JZG4EF2YY3PNVWWK3TUL52HS4DFUVEXG43VMWVGG33NNVSW45C7NFSM4H7T26DA, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTDFW67SV3MSNSZYXOTQYAZETANCNFSM4JZG4EFQ .
Thanks for the suggestion; I tried this, and it worked! There were still some problems -- hundreds of those same AssertionError
s from before; additionally the adaptive cluster dashboard is unresponsive for the first 1-2 minutes. Also, the task allocation is super lopsided, and a lot of workers didn't have a lot to do:
Errors and inefficiency aside, maybe taking the min
and max
is "challenge mode" for the adaptive scheduler, since the task graph is a giant tree with a lot of convergence. I'll try something a little less tree-like and see how well that works.
What was the enabling change in distributed
that you suspect improved things?
Are the assertions still the same as before, coming from replicate, or are they coming from somewhere else now?
You're spending a lot of time reading and writing from disk, which is in line with your concern that all of the data at first landed in the same place. For adaptive workloads you might want to consider turnng off spill-to-disk, and instead just pause workers when they start running out of memory. Perhaps with a configuration like this:
distributed:
worker:
memory:
target: false # don't spill to disk
spill: false # don't spill to disk
pause: 0.70 # pause execution at 70% memory use
terminate: 0.95 # restart the worker at 95% use
For some reason load balancing isn't kicking in. My guess is that this is because dependencies that were already computed are trapped on disk. I'm not sure though. I would be curious if that change helps.
Hrm, it's odd that there is read-write behavior even when the reported memory of your workers is low. My guess is that some of your objects are mis-reporting their size as too large, resulting in Dask being unwilling to move them around, even in the context of a need to load balance.
Are the assertions still the same as before, coming from replicate, or are they coming from somewhere else now?
Yeah, they are still from the replicate
Hrm, it's odd that there is read-write behavior even when the reported memory of your workers is low. My guess is that some of your objects are mis-reporting their size as too large, resulting in Dask being unwilling to move them around, even in the context of a need to load balance.
The data is just da.zeros()
with a type conversion, so the memory footprint should be pretty predictable...
I'll try your suggestion for the worker config tomorrow and update with what I see.
@mrocklin adaptive execution of this specific workload stabilized after I used the config you supplied. However, I can easily get the adaptive cluster to hang again by increasing the number of chunks of my data. (Like the previous workload, the static cluster has no trouble with this).
The errors are the same as before -- a lot of variations of this:
distributed.core - ERROR - 'tcp://10.36.60.32:43754'
Traceback (most recent call last):
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream
handler(**merge(extra, msg))
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2362, in handle_release_data
ws = self.workers[worker]
KeyError: 'tcp://10.36.60.32:43754'
and this:
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b5b67db1b38>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt
await self.scale_down(**recommendations)
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
names=workers, remove=True, close_workers=True
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
raise exc.with_traceback(tb)
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
result = await result
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3184, in retire_workers
lock=False,
File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2938, in replicate
assert count > 0
AssertionError
The dashboard is locked in this state:
One answer might be "avoid chunking your data that way", but unfortunately this chunking is very natural for certain operations on the data, and it's strange that the static cluster manages just fine here.
If you're running into exceptions then it's definitely a bug on our side. Thank you for taking the time to run experiments and provide additional information. I may have some time to look at this tomorrow. If you're able to provide a minimal reproducible example that I'm able to run locally that would greatly increase the chances that I'll be able to help (though I understand that this may be a challenge in your case).
On Wed, Dec 11, 2019 at 8:22 AM Davis Bennett [email protected] wrote:
@mrocklin https://github.com/mrocklin adaptive execution of this specific workload stabilized after I used the config you supplied. However, I can easily get the adaptive cluster to hang again by increasing the number of chunks of my data. (Like the previous workload, the static cluster has no trouble with this).
The errors are the same as before -- a lot of variations of this:
distributed.core - ERROR - 'tcp://10.36.60.32:43754' Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 473, in handle_stream handler(**merge(extra, msg)) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2362, in handle_release_data ws = self.workers[worker] KeyError: 'tcp://10.36.60.32:43754'
and this:
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b5b67db1b38>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>) Traceback (most recent call last): File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback ret = callback() File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result future.result() File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt await self.scale_down(**recommendations) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down names=workers, remove=True, close_workers=True File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc result = await send_recv(comm=comm, op=key, **kwargs) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv raise exc.with_traceback(tb) File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm result = await result File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 3184, in retire_workers lock=False, File "/groups/scicompsoft/home/bennettd/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2938, in replicate assert count > 0 AssertionError
The dashboard is locked in this state: [image: image] https://user-images.githubusercontent.com/3805136/70639018-d7420b80-1c07-11ea-9afe-982033979d5c.png
One answer might be "avoid chunking your data that way", but unfortunately this chunking is very natural for certain operations on the data, and it's strange that the static cluster manages just fine here.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3312?email_source=notifications&email_token=AACKZTBM3EQMMNQ3HYXXCJ3QYEHTLA5CNFSM4JZG4EF2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEGTW7YI#issuecomment-564621281, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGUK62CF5SF3FDGER3QYEHTLANCNFSM4JZG4EFQ .
Thanks for helping out with this -- I made a repo with a notebook that contains my tests. Let me know if there's anything else you need to know about my deployment. Hopefully you can get some insight into my issues from a local deployment; if not, I'm happy to test more stuff on the Janelia cluster (and maybe @jakirkham could also help, if he still has cluster access)