dask-cuda
dask-cuda copied to clipboard
Client.rebalance fails on dgx-2 with LocalCUDACluster
Client.rebalance Fails on dgx-2 with LocalCUDACluster
We currently error when we try to rebalance a dataframe (both dask-cudf
and pure-dask
) on local_cuda_cluster
on dgx-02
.
Context:
- This works fine on
dgx-1
- This works fine with
LocalCluster
I tested it on both exp-01
and dgx-202
.
Minimal Gist: https://gist.github.com/VibhuJawa/19c2cc886f6310d2b468671079014fe6
Imports
import cudf
import dask_cudf
from dask import delayed
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster
from dask.distributed import LocalCluster
# below works
# cluster = LocalCluster()
# below fails
cluster = LocalCUDACluster()
client = Client(cluster)
Write test data to disk
cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')
Rebalancing Fails on dgx-2
### read written dataset
df = dd.read_parquet('test_data/*.parquet')
# df = dask_cudf.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)
### below fails for us
client.rebalance(df)
Error Trace
distributed.utils - ERROR - can only concatenate list (not "dict") to list
Traceback (most recent call last):
File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
yield
File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py", line 3058, in rebalance
"keys": sum([r["keys"] for r in result if "keys" in r], []),
TypeError: can only concatenate list (not "dict") to list
distributed.core - ERROR - can only concatenate list (not "dict") to list
Traceback (most recent call last):
File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
result = await result
File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py", line 3058, in rebalance
"keys": sum([r["keys"] for r in result if "keys" in r], []),
TypeError: can only concatenate list (not "dict") to list
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-4-2b5d4120310b> in <module>
7
8 ### below fails for us
----> 9 client.rebalance(df)
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in rebalance(self, futures, workers, **kwargs)
3024 A list of workers on which to balance, defaults to all workers
3025 """
-> 3026 return self.sync(self._rebalance, futures, workers, **kwargs)
3027
3028 async def _replicate(self, futures, n=None, workers=None, branching_factor=2):
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
765 else:
766 return sync(
--> 767 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
768 )
769
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
343 if error[0]:
344 typ, exc, tb = error[0]
--> 345 raise exc.with_traceback(tb)
346 else:
347 return result[0]
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py in f()
327 if callback_timeout is not None:
328 future = asyncio.wait_for(future, callback_timeout)
--> 329 result[0] = yield future
330 except Exception as exc:
331 error[0] = sys.exc_info()
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in _rebalance(self, futures, workers)
3003 await _wait(futures)
3004 keys = list({tokey(f.key) for f in self.futures_of(futures)})
-> 3005 result = await self.scheduler.rebalance(keys=keys, workers=workers)
3006 assert result["status"] == "OK"
3007
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
755 name, comm.name = comm.name, "ConnectionPool." + key
756 try:
--> 757 result = await send_recv(comm=comm, op=key, **kwargs)
758 finally:
759 self.pool.reuse(self.addr, comm)
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
554 if comm.deserialize:
555 typ, exc, tb = clean_exception(**response)
--> 556 raise exc.with_traceback(tb)
557 else:
558 raise Exception(response["text"])
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in handle_comm()
410 result = asyncio.ensure_future(result)
411 self._ongoing_coroutines.add(result)
--> 412 result = await result
413 except (CommClosedError, CancelledError) as e:
414 if self.status == "running":
/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py in rebalance()
3056 return {
3057 "status": "missing-data",
-> 3058 "keys": sum([r["keys"] for r in result if "keys" in r], []),
3059 }
3060
TypeError: can only concatenate list (not "dict") to list
Environment Details:
Environment Created Using:
conda create -n cudf_13_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7
Conda Environment:
dask 2.10.1 py_0 conda-forge
dask-core 2.10.1 py_0 conda-forge
dask-cuda 0.13.0b200213 py37_12 rapidsai-nightly
dask-cudf 0.13.0a200213 py37_1710 rapidsai-nightly
I don't think that is how rebalance
is intended to be used as the first argument is comm
not a Dask object.
There are a couple ways you could use it though.
-
client.rebalance()
(no args, just let Dask figure out how to do this) -
client.rebalance(keys=...)
(rebalances keys of the particular object)
The keys of your object would then be gotten like so...
keys = list(dask.core.flatten(df.__dask_keys__()))
That all being said, I'm interested to hear why we think rebalancing is needed here. Could you please provide a bit more context? 🙂
I don't think that is how
rebalance
is intended to be used as the first argument iscomm
not a Dask object.
I thought its supposed to a futures
object no ?
See: https://distributed.dask.org/en/latest/_modules/distributed/client.html#Client.rebalance
https://github.com/dask/distributed/blob/04de4b2adc1f0c94ab86e9aa46a4c67382a7eaea/distributed/client.py#L3018-L3037
Please let me know if that is incorrect .
FWIW, client.rebalance(futures=df.to_delayed())
fails too so i am hoping its just not just a api call mistake.
Also, we experimented with below on exp-01 and :
CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 works .
CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7,8,9,10 works.
CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7,8,9,10,11 Fails
My mistake. For some reason was thinking of Scheduler.rebalance
. Agree futures
makes more sense in this context. 😅
So .to_delayed()
will give you a Delayed
object, which is not a Future
. Would use distributed.futures_of(...)
to get a list
of Future
s from your Dask object. 😉
My mistake. For some reason was thinking of
Scheduler.rebalance
. Agreefutures
makes more sense in this context. 😅So
.to_delayed()
will give you aDelayed
object, which is not aFuture
. Would usedistributed.futures_of(...)
to get alist
ofFuture
s from your Dask object. 😉
This still fails. :-\
client.rebalance(futures = distributed.futures_of(df))
Does it fail with the same error or is it different?
Does it fail with the same error or is it different?
Same error :-/
What does distributed.futures_of(df)
return?
Ok so sorry this is a little disorganized, but I ran the following, which seemed to work for me. Could you please try and let me know how it goes?
In [1]: import cudf
...: import dask_cudf
...: from dask import delayed
...:
...: from dask_cuda import LocalCUDACluster
...: import dask.dataframe as dd
...: from dask.distributed import Client, wait, LocalCluster
In [2]: cluster = LocalCUDACluster()
In [3]: client = Client(cluster)
In [4]: cdf = cudf.datasets.timeseries()
In [5]: dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')
In [6]: df = dd.read_parquet('test_data/*.parquet')
In [7]: n_workers = len(client.scheduler_info()["workers"])
In [8]: df = df.repartition(npartitions=n_workers).persist()
In [9]: _ = wait(df)
In [10]: import distributed
In [11]: client.rebalance(futures=distributed.futures_of(df))
Ok so sorry this is a little disorganized, but I ran the following, which seemed to work for me. Could you please try and let me know how it goes?
In [1]: import cudf ...: import dask_cudf ...: from dask import delayed ...: ...: from dask_cuda import LocalCUDACluster ...: import dask.dataframe as dd ...: from dask.distributed import Client, wait, LocalCluster In [2]: cluster = LocalCUDACluster() In [3]: client = Client(cluster) In [4]: cdf = cudf.datasets.timeseries() In [5]: dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data') In [6]: df = dd.read_parquet('test_data/*.parquet') In [7]: n_workers = len(client.scheduler_info()["workers"]) In [8]: df = df.repartition(npartitions=n_workers).persist() In [9]: _ = wait(df) In [10]: import distributed In [11]: client.rebalance(futures=distributed.futures_of(df))
Which machine was this on ?
I think error reported is a bug -- probably due to some changes in the last year or so. looking at the code:
https://github.com/dask/distributed/blob/04de4b2adc1f0c94ab86e9aa46a4c67382a7eaea/distributed/scheduler.py#L3089
I am pretty sure sum([...],[]) is illegal. With that said, what you probably care about is happening just above the error message (not posted in the first comment). Probably something like:
distributed.worker - WARNING - Could not find data: {"('repartition-16-cc7cce8a28fbc6dcb5a870fb88f7c0fc', 1)": ['tcp://127.0.0.1:40253']} on workers: [] (who_has: {"('repartition-16-cc7cce8a28fbc6dcb5a870fb88f7c0fc', 1)": ['tcp://127.0.0. 1:40253']})
This kind of issue has come up in the past in distributed: https://github.com/dask/distributed/issues/1333 but I am still not sure what is going on with this particular example. However, like in issue 1333, when i re-run client.rebalance
the command succeeds. The cluster is "quiet" and it should work but I am not sure why it is failing
I don't think this is a dask-cuda issue and is instead due to the lack of robustness around rebalance. I'll explore more tomorrow morning
Following the example from @jakirkham works for me on both a DGX-1 and DGX-2.
Using the conda environment from the initial issue.
conda create -n cudf_13_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7
$ conda list | grep dask
dask 2.10.1 py_0 conda-forge
dask-core 2.10.1 py_0 conda-forge
dask-cuda 0.13.0b200214 py37_12 rapidsai-nightly
dask-cudf 0.13.0a200214 py37_1813 rapidsai-nightly
dask-xgboost 0.2.0.dev28 cuda10.1py37_0 rapidsai-nightly
$ conda list | grep distributed
distributed 2.10.0 py_0 conda-forge
$ conda list | grep cudf
cudf 0.13.0a200214 py37_1813 rapidsai-nightly
dask-cudf 0.13.0a200214 py37_1813 rapidsai-nightly
libcudf 0.13.0a200214 cuda10.1_1813 rapidsai-nightly
import cudf
import dask_cudf
from dask import delayed
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster
cluster = LocalCUDACluster(local_directory="/tmp")
client = Client(cluster)
cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')
df = dd.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)
import distributed
client.rebalance(futures=distributed.futures_of(df))
After building a new env I am also no longer able to reproduce
@VibhuJawa can you try clearing out any state dask files:
- storage
- dask-worker-space
- worker-*
I don't think this should really be a problem but worth a shot
So even with a fresh clean environment (without dask state files), i still get the error.
I ran it on exp-01
.
On todays nightly: https://gist.github.com/VibhuJawa/20e4eb194ef95a7c17dbc01c850028ed On yesterdays nightly: https://gist.github.com/VibhuJawa/10d850874bf3f96e7adf62dff7251efb .
Running below script:
import cudf
import dask_cudf
from dask import delayed
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster
cluster = LocalCUDACluster(local_directory="/tmp")
client = Client(cluster)
cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')
df = dd.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)
import distributed
client.rebalance(futures=distributed.futures_of(df))
The bug might be non deterministic , and may be happening more on exp-01
than on dgx-202
, i will run the script a couple of times on both and report results once i get a dgx-2
free again.
can you report the contents of ~/.config/distributed.yaml
~/.config/distributed.yaml
This file seems to be empty.
cat ~/.config/distributed.yaml
wc -l ~/.config/distributed.yaml
0 /home/nfs/vjawa/.config/distributed.yaml
So even with a fresh clean environment (without dask state files), i still get the error.
I ran it on
exp-01
.On todays nightly: https://gist.github.com/VibhuJawa/20e4eb194ef95a7c17dbc01c850028ed On yesterdays nightly: https://gist.github.com/VibhuJawa/10d850874bf3f96e7adf62dff7251efb .
Running below script:
import cudf import dask_cudf from dask import delayed from dask_cuda import LocalCUDACluster import dask.dataframe as dd from dask.distributed import Client,wait,LocalCluster cluster = LocalCUDACluster(local_directory="/tmp") client = Client(cluster) cdf = cudf.datasets.timeseries() dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data') df = dd.read_parquet('test_data/*.parquet') n_workers = len(client.scheduler_info()["workers"]) df = df.repartition(npartitions=n_workers).persist() _ = wait(df) import distributed client.rebalance(futures=distributed.futures_of(df))
The bug might be non deterministic , and may be happening more on
exp-01
than ondgx-202
, i will run the script a couple of times on both and report results once i get adgx-2
free again.
I tried the same thing with on my machine and got the same error.
Environment: conda create -n cudf_14_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7
Interestingly changing number of partitions to another number (like 15) dask_cudf.from_cudf(cdf, npartitions=15).to_parquet('test_data')
resulted in no error.
Do you have any files in ~/.config/dask/
with config in?
Do you have any files in
~/.config/dask/
with config in?
I have 2 files : (dask.yaml and distributed.yaml)
dask.yaml
cat ~/.config/dask/dask.yaml
# temporary-directory: null # Directory for local disk like /tmp, /scratch, or /local
distributed.yaml
cat ~/.config/dask/distributed.yaml
# distributed:
# version: 2
# # logging:
# # distributed: info
# # distributed.client: warning
# # bokeh: critical
# # # http://stackoverflow.com/questions/21234772/python-tornado-disable-logging-to-stderr
# # tornado: critical
# # tornado.application: error
# scheduler:
# allowed-failures: 3 # number of retries before a task is considered bad
# bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth
# blocked-handlers: []
# default-data-size: 1000
# # Number of seconds to wait until workers or clients are removed from the events log
# # after they have been removed from the scheduler
# events-cleanup-delay: 1h
# transition-log-length: 100000
# work-stealing: True # workers should steal tasks from each other
# worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this
# preload: []
# preload-argv: []
# worker:
# blocked-handlers: []
# multiprocessing-method: forkserver
# use-file-locking: True
# connections: # Maximum concurrent connections for data
# outgoing: 50 # This helps to control network saturation
# incoming: 10
# preload: []
# preload-argv: []
# profile:
# interval: 10ms # Time between statistical profiling queries
# cycle: 1000ms # Time between starting new profile
# # Fractions of worker memory at which we take action to avoid memory blowup
# # Set any of the lower three values to False to turn off the behavior entirely
# memory:
# target: 0.60 # target fraction to stay below
# spill: 0.70 # fraction at which we spill to disk
# pause: 0.80 # fraction at which we pause worker threads
# terminate: 0.95 # fraction at which we terminate the worker
# client:
# heartbeat: 5s # time between client heartbeats
# comm:
# compression: auto
# default-scheme: tcp
# socket-backlog: 2048
# recent-messages-log-length: 0 # number of messages to keep for debugging
# timeouts:
# connect: 10s # time before connecting fails
# tcp: 30s # time before calling an unresponsive connection dead
# # require-encryption: False # whether to require encryption on non-local comms
# #
# # tls:
# # ca-file: xxx.pem
# # scheduler:
# # key: xxx.pem
# # cert: xxx.pem
# # worker:
# # key: xxx.pem
# # cert: xxx.pem
# # client:
# # key: xxx.pem
# # cert: xxx.pem
# # ciphers:
# # ECDHE-ECDSA-AES128-GCM-SHA256
# ###################
# # Bokeh dashboard #
# ###################
# dashboard:
# link: "http://{host}:{port}/status"
# export-tool: False
# ##################
# # Administrative #
# ##################
# admin:
# tick:
# interval: 20ms # time between event loop health checks
# limit: 3s # time allowed before triggering a warning
# log-length: 10000 # default length of logs to keep in memory
# log-format: '%(name)s - %(levelname)s - %(message)s'
# pdb-on-err: False # enter debug mode on scheduling error
Hmm ok. All of the lines in both of those are commented.
Hmm ok. All of the lines in both of those are commented.
Can you try to repro this on exp-01
, may be that will help you move forward on this.
I am pretty sure sum([...],[]) is illegal.
Forgot to mention this before, but this is legal. The second argument here is the start
value to sum
, which determines what it adds things to. If unspecified, it would be 0
. Here's an example of how this would be used...
In [1]: sum([[1], [2, 3], [4]], [])
Out[1]: [1, 2, 3, 4]
In [1]: sum([[1], [2, 3], [4]], [])
Out[1]: [1, 2, 3, 4]
Interesting. but not:
In [4]: sum([1,2,3], [])
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-4-6a41ffa2f670> in <module>
----> 1 sum([1,2,3], [])
TypeError: can only concatenate list (not "int") to list
Yeah because that translates to [] + 1 + 2 + 3
.
The previous case translates to [] + [1] + [2, 3] + [4]
.
If it helps, we could also imagine sum()
being written like so.
def sum(iterable, start=0):
result = start
for each in iterable:
result += each
return result
Edit: IOW if we want start=[]
, we need each
to always be [...]
,
which means that iterable = [[...], [...], ...]
.
@VibhuJawa dask recently merged a PR related to rebalancing. If you have time it would be good to test: https://github.com/dask/distributed/pull/3670
Have you had a chance to test https://github.com/dask/distributed/pull/3670 @VibhuJawa ? Is this still an issue?
FWIW that change is in Distributed 2.15.0+
Thanks for the ping @jakirkham and @pentschev . I have not tested this yet. Will test and update here accordingly.