dask-cuda icon indicating copy to clipboard operation
dask-cuda copied to clipboard

Client.rebalance fails on dgx-2 with LocalCUDACluster

Open VibhuJawa opened this issue 5 years ago • 34 comments

Client.rebalance Fails on dgx-2 with LocalCUDACluster

We currently error when we try to rebalance a dataframe (both dask-cudf and pure-dask) on local_cuda_cluster on dgx-02 .

Context:

  • This works fine on dgx-1
  • This works fine with LocalCluster

I tested it on both exp-01 and dgx-202.

Minimal Gist: https://gist.github.com/VibhuJawa/19c2cc886f6310d2b468671079014fe6

Imports

import cudf
import dask_cudf
from dask import delayed

from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster

from dask.distributed import LocalCluster
# below works
# cluster = LocalCluster()

# below fails
cluster = LocalCUDACluster()
client = Client(cluster)

Write test data to disk

cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')

Rebalancing Fails on dgx-2

### read written dataset
df = dd.read_parquet('test_data/*.parquet')
# df = dask_cudf.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)

### below fails for us
client.rebalance(df)

Error Trace


distributed.utils - ERROR - can only concatenate list (not "dict") to list
Traceback (most recent call last):
  File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py", line 3058, in rebalance
    "keys": sum([r["keys"] for r in result if "keys" in r], []),
TypeError: can only concatenate list (not "dict") to list
distributed.core - ERROR - can only concatenate list (not "dict") to list
Traceback (most recent call last):
  File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py", line 3058, in rebalance
    "keys": sum([r["keys"] for r in result if "keys" in r], []),
TypeError: can only concatenate list (not "dict") to list
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-4-2b5d4120310b> in <module>
      7 
      8 ### below fails for us
----> 9 client.rebalance(df)

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in rebalance(self, futures, workers, **kwargs)
   3024             A list of workers on which to balance, defaults to all workers
   3025         """
-> 3026         return self.sync(self._rebalance, futures, workers, **kwargs)
   3027 
   3028     async def _replicate(self, futures, n=None, workers=None, branching_factor=2):

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    765         else:
    766             return sync(
--> 767                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    768             )
    769 

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    343     if error[0]:
    344         typ, exc, tb = error[0]
--> 345         raise exc.with_traceback(tb)
    346     else:
    347         return result[0]

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/utils.py in f()
    327             if callback_timeout is not None:
    328                 future = asyncio.wait_for(future, callback_timeout)
--> 329             result[0] = yield future
    330         except Exception as exc:
    331             error[0] = sys.exc_info()

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/client.py in _rebalance(self, futures, workers)
   3003         await _wait(futures)
   3004         keys = list({tokey(f.key) for f in self.futures_of(futures)})
-> 3005         result = await self.scheduler.rebalance(keys=keys, workers=workers)
   3006         assert result["status"] == "OK"
   3007 

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    755             name, comm.name = comm.name, "ConnectionPool." + key
    756             try:
--> 757                 result = await send_recv(comm=comm, op=key, **kwargs)
    758             finally:
    759                 self.pool.reuse(self.addr, comm)

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    554         if comm.deserialize:
    555             typ, exc, tb = clean_exception(**response)
--> 556             raise exc.with_traceback(tb)
    557         else:
    558             raise Exception(response["text"])

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/core.py in handle_comm()
    410                             result = asyncio.ensure_future(result)
    411                             self._ongoing_coroutines.add(result)
--> 412                             result = await result
    413                     except (CommClosedError, CancelledError) as e:
    414                         if self.status == "running":

/raid/vjawa/miniconda3/envs/cudf_13_feb/lib/python3.7/site-packages/distributed/scheduler.py in rebalance()
   3056                     return {
   3057                         "status": "missing-data",
-> 3058                         "keys": sum([r["keys"] for r in result if "keys" in r], []),
   3059                     }
   3060 

TypeError: can only concatenate list (not "dict") to list

Environment Details:

Environment Created Using:

conda create -n cudf_13_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7

Conda Environment:

dask                      2.10.1                     py_0    conda-forge
dask-core                 2.10.1                     py_0    conda-forge
dask-cuda                 0.13.0b200213           py37_12    rapidsai-nightly
dask-cudf                 0.13.0a200213         py37_1710    rapidsai-nightly

VibhuJawa avatar Feb 13 '20 21:02 VibhuJawa

I don't think that is how rebalance is intended to be used as the first argument is comm not a Dask object.

There are a couple ways you could use it though.

  1. client.rebalance() (no args, just let Dask figure out how to do this)
  2. client.rebalance(keys=...) (rebalances keys of the particular object)

The keys of your object would then be gotten like so...

keys = list(dask.core.flatten(df.__dask_keys__()))

jakirkham avatar Feb 13 '20 23:02 jakirkham

That all being said, I'm interested to hear why we think rebalancing is needed here. Could you please provide a bit more context? 🙂

jakirkham avatar Feb 13 '20 23:02 jakirkham

I don't think that is how rebalance is intended to be used as the first argument is comm not a Dask object.

I thought its supposed to a futures object no ?

See: https://distributed.dask.org/en/latest/_modules/distributed/client.html#Client.rebalance

https://github.com/dask/distributed/blob/04de4b2adc1f0c94ab86e9aa46a4c67382a7eaea/distributed/client.py#L3018-L3037

Please let me know if that is incorrect .

FWIW, client.rebalance(futures=df.to_delayed()) fails too so i am hoping its just not just a api call mistake.

Also, we experimented with below on exp-01 and :

CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7  works .
CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7,8,9,10 works.
CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7,8,9,10,11 Fails

VibhuJawa avatar Feb 13 '20 23:02 VibhuJawa

My mistake. For some reason was thinking of Scheduler.rebalance. Agree futures makes more sense in this context. 😅

So .to_delayed() will give you a Delayed object, which is not a Future. Would use distributed.futures_of(...) to get a list of Futures from your Dask object. 😉

jakirkham avatar Feb 14 '20 00:02 jakirkham

My mistake. For some reason was thinking of Scheduler.rebalance. Agree futures makes more sense in this context. 😅

So .to_delayed() will give you a Delayed object, which is not a Future. Would use distributed.futures_of(...) to get a list of Futures from your Dask object. 😉

This still fails. :-\

client.rebalance(futures = distributed.futures_of(df))

VibhuJawa avatar Feb 14 '20 00:02 VibhuJawa

Does it fail with the same error or is it different?

jakirkham avatar Feb 14 '20 01:02 jakirkham

Does it fail with the same error or is it different?

Same error :-/

VibhuJawa avatar Feb 14 '20 01:02 VibhuJawa

What does distributed.futures_of(df) return?

jakirkham avatar Feb 14 '20 01:02 jakirkham

Ok so sorry this is a little disorganized, but I ran the following, which seemed to work for me. Could you please try and let me know how it goes?

In [1]: import cudf
   ...: import dask_cudf
   ...: from dask import delayed
   ...:  
   ...: from dask_cuda import LocalCUDACluster
   ...: import dask.dataframe as dd
   ...: from dask.distributed import Client, wait, LocalCluster

In [2]: cluster = LocalCUDACluster()

In [3]: client = Client(cluster)

In [4]: cdf = cudf.datasets.timeseries()

In [5]: dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')

In [6]: df = dd.read_parquet('test_data/*.parquet')

In [7]: n_workers = len(client.scheduler_info()["workers"])

In [8]: df = df.repartition(npartitions=n_workers).persist()                    

In [9]: _ = wait(df)                                                            

In [10]: import distributed                                                     

In [11]: client.rebalance(futures=distributed.futures_of(df))

jakirkham avatar Feb 14 '20 01:02 jakirkham

Ok so sorry this is a little disorganized, but I ran the following, which seemed to work for me. Could you please try and let me know how it goes?

In [1]: import cudf
   ...: import dask_cudf
   ...: from dask import delayed
   ...:  
   ...: from dask_cuda import LocalCUDACluster
   ...: import dask.dataframe as dd
   ...: from dask.distributed import Client, wait, LocalCluster

In [2]: cluster = LocalCUDACluster()

In [3]: client = Client(cluster)

In [4]: cdf = cudf.datasets.timeseries()

In [5]: dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')

In [6]: df = dd.read_parquet('test_data/*.parquet')

In [7]: n_workers = len(client.scheduler_info()["workers"])

In [8]: df = df.repartition(npartitions=n_workers).persist()                    

In [9]: _ = wait(df)                                                            

In [10]: import distributed                                                     

In [11]: client.rebalance(futures=distributed.futures_of(df))

Which machine was this on ?

VibhuJawa avatar Feb 14 '20 02:02 VibhuJawa

I think error reported is a bug -- probably due to some changes in the last year or so. looking at the code:

https://github.com/dask/distributed/blob/04de4b2adc1f0c94ab86e9aa46a4c67382a7eaea/distributed/scheduler.py#L3089

I am pretty sure sum([...],[]) is illegal. With that said, what you probably care about is happening just above the error message (not posted in the first comment). Probably something like:

distributed.worker - WARNING - Could not find data: {"('repartition-16-cc7cce8a28fbc6dcb5a870fb88f7c0fc', 1)": ['tcp://127.0.0.1:40253']} on workers: [] (who_has: {"('repartition-16-cc7cce8a28fbc6dcb5a870fb88f7c0fc', 1)": ['tcp://127.0.0. 1:40253']})

This kind of issue has come up in the past in distributed: https://github.com/dask/distributed/issues/1333 but I am still not sure what is going on with this particular example. However, like in issue 1333, when i re-run client.rebalance the command succeeds. The cluster is "quiet" and it should work but I am not sure why it is failing

I don't think this is a dask-cuda issue and is instead due to the lack of robustness around rebalance. I'll explore more tomorrow morning

quasiben avatar Feb 14 '20 05:02 quasiben

Following the example from @jakirkham works for me on both a DGX-1 and DGX-2.

Using the conda environment from the initial issue.

conda create -n cudf_13_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7
$ conda list | grep dask
dask                      2.10.1                     py_0    conda-forge
dask-core                 2.10.1                     py_0    conda-forge
dask-cuda                 0.13.0b200214           py37_12    rapidsai-nightly
dask-cudf                 0.13.0a200214         py37_1813    rapidsai-nightly
dask-xgboost              0.2.0.dev28      cuda10.1py37_0    rapidsai-nightly
$ conda list | grep distributed
distributed               2.10.0                     py_0    conda-forge
$ conda list | grep cudf
cudf                      0.13.0a200214         py37_1813    rapidsai-nightly
dask-cudf                 0.13.0a200214         py37_1813    rapidsai-nightly
libcudf                   0.13.0a200214     cuda10.1_1813    rapidsai-nightly
import cudf
import dask_cudf
from dask import delayed
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster

cluster = LocalCUDACluster(local_directory="/tmp")
client = Client(cluster)

cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')

df = dd.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)

import distributed
client.rebalance(futures=distributed.futures_of(df))

jacobtomlinson avatar Feb 14 '20 14:02 jacobtomlinson

After building a new env I am also no longer able to reproduce

quasiben avatar Feb 14 '20 15:02 quasiben

@VibhuJawa can you try clearing out any state dask files:

  1. storage
  2. dask-worker-space
  3. worker-*

I don't think this should really be a problem but worth a shot

quasiben avatar Feb 14 '20 15:02 quasiben

So even with a fresh clean environment (without dask state files), i still get the error.

I ran it on exp-01 .

On todays nightly: https://gist.github.com/VibhuJawa/20e4eb194ef95a7c17dbc01c850028ed On yesterdays nightly: https://gist.github.com/VibhuJawa/10d850874bf3f96e7adf62dff7251efb .

Running below script:

import cudf
import dask_cudf
from dask import delayed
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster

cluster = LocalCUDACluster(local_directory="/tmp")
client = Client(cluster)

cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')

df = dd.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)

import distributed
client.rebalance(futures=distributed.futures_of(df))

The bug might be non deterministic , and may be happening more on exp-01 than on dgx-202 , i will run the script a couple of times on both and report results once i get a dgx-2 free again.

VibhuJawa avatar Feb 14 '20 17:02 VibhuJawa

can you report the contents of ~/.config/distributed.yaml

quasiben avatar Feb 14 '20 17:02 quasiben

~/.config/distributed.yaml

This file seems to be empty.

cat ~/.config/distributed.yaml 

wc -l ~/.config/distributed.yaml 

0 /home/nfs/vjawa/.config/distributed.yaml

VibhuJawa avatar Feb 14 '20 17:02 VibhuJawa

So even with a fresh clean environment (without dask state files), i still get the error.

I ran it on exp-01 .

On todays nightly: https://gist.github.com/VibhuJawa/20e4eb194ef95a7c17dbc01c850028ed On yesterdays nightly: https://gist.github.com/VibhuJawa/10d850874bf3f96e7adf62dff7251efb .

Running below script:

import cudf
import dask_cudf
from dask import delayed
from dask_cuda import LocalCUDACluster
import dask.dataframe as dd
from dask.distributed import Client,wait,LocalCluster

cluster = LocalCUDACluster(local_directory="/tmp")
client = Client(cluster)

cdf = cudf.datasets.timeseries()
dask_cudf.from_cudf(cdf, npartitions=3).to_parquet('test_data')

df = dd.read_parquet('test_data/*.parquet')
n_workers = len(client.scheduler_info()["workers"])
df = df.repartition(npartitions=n_workers).persist()
_ = wait(df)

import distributed
client.rebalance(futures=distributed.futures_of(df))

The bug might be non deterministic , and may be happening more on exp-01 than on dgx-202 , i will run the script a couple of times on both and report results once i get a dgx-2 free again.

I tried the same thing with on my machine and got the same error. Environment: conda create -n cudf_14_feb -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 ucx-py=0.13 python=3.7

Interestingly changing number of partitions to another number (like 15) dask_cudf.from_cudf(cdf, npartitions=15).to_parquet('test_data') resulted in no error.

ayushdg avatar Feb 15 '20 00:02 ayushdg

Do you have any files in ~/.config/dask/ with config in?

jacobtomlinson avatar Feb 17 '20 10:02 jacobtomlinson

Do you have any files in ~/.config/dask/ with config in?

I have 2 files : (dask.yaml and distributed.yaml)

dask.yaml

cat  ~/.config/dask/dask.yaml
# temporary-directory: null     # Directory for local disk like /tmp, /scratch, or /local

distributed.yaml

cat  ~/.config/dask/distributed.yaml
# distributed:
#   version: 2
#   # logging:
#   #   distributed: info
#   #   distributed.client: warning
#   #   bokeh: critical
#   #   # http://stackoverflow.com/questions/21234772/python-tornado-disable-logging-to-stderr
#   #   tornado: critical
#   #   tornado.application: error

#   scheduler:
#     allowed-failures: 3     # number of retries before a task is considered bad
#     bandwidth: 100000000    # 100 MB/s estimated worker-worker bandwidth
#     blocked-handlers: []
#     default-data-size: 1000
#     # Number of seconds to wait until workers or clients are removed from the events log
#     # after they have been removed from the scheduler
#     events-cleanup-delay: 1h
#     transition-log-length: 100000
#     work-stealing: True     # workers should steal tasks from each other
#     worker-ttl: null        # like '60s'. Time to live for workers.  They must heartbeat faster than this
#     preload: []
#     preload-argv: []

#   worker:
#     blocked-handlers: []
#     multiprocessing-method: forkserver
#     use-file-locking: True
#     connections:            # Maximum concurrent connections for data
#       outgoing: 50          # This helps to control network saturation
#       incoming: 10
#     preload: []
#     preload-argv: []

#     profile:
#       interval: 10ms        # Time between statistical profiling queries
#       cycle: 1000ms         # Time between starting new profile

#     # Fractions of worker memory at which we take action to avoid memory blowup
#     # Set any of the lower three values to False to turn off the behavior entirely
#     memory:
#       target: 0.60  # target fraction to stay below
#       spill: 0.70  # fraction at which we spill to disk
#       pause: 0.80  # fraction at which we pause worker threads
#       terminate: 0.95  # fraction at which we terminate the worker

#   client:
#     heartbeat: 5s  # time between client heartbeats

#   comm:
#     compression: auto
#     default-scheme: tcp
#     socket-backlog: 2048
#     recent-messages-log-length: 0  # number of messages to keep for debugging

#     timeouts:
#       connect: 10s          # time before connecting fails
#       tcp: 30s              # time before calling an unresponsive connection dead

#   # require-encryption: False   # whether to require encryption on non-local comms
#   #
#   # tls:
#   #   ca-file: xxx.pem
#   #   scheduler:
#   #     key: xxx.pem
#   #     cert: xxx.pem
#   #   worker:
#   #     key: xxx.pem
#   #     cert: xxx.pem
#   #   client:
#   #     key: xxx.pem
#   #     cert: xxx.pem
#   #   ciphers:
#   #     ECDHE-ECDSA-AES128-GCM-SHA256


#   ###################
#   # Bokeh dashboard #
#   ###################

#   dashboard:
#     link: "http://{host}:{port}/status"
#     export-tool: False

#   ##################
#   # Administrative #
#   ##################

#   admin:
#     tick:
#       interval: 20ms  # time between event loop health checks
#       limit: 3s       # time allowed before triggering a warning

#     log-length: 10000  # default length of logs to keep in memory
#     log-format: '%(name)s - %(levelname)s - %(message)s'
#     pdb-on-err: False       # enter debug mode on scheduling error

VibhuJawa avatar Feb 18 '20 17:02 VibhuJawa

Hmm ok. All of the lines in both of those are commented.

jacobtomlinson avatar Feb 19 '20 10:02 jacobtomlinson

Hmm ok. All of the lines in both of those are commented.

Can you try to repro this on exp-01, may be that will help you move forward on this.

VibhuJawa avatar Feb 19 '20 17:02 VibhuJawa

I am pretty sure sum([...],[]) is illegal.

Forgot to mention this before, but this is legal. The second argument here is the start value to sum, which determines what it adds things to. If unspecified, it would be 0. Here's an example of how this would be used...

In [1]: sum([[1], [2, 3], [4]], [])                                             
Out[1]: [1, 2, 3, 4]

jakirkham avatar Mar 02 '20 00:03 jakirkham

In [1]: sum([[1], [2, 3], [4]], [])
Out[1]: [1, 2, 3, 4]

Interesting. but not:

In [4]: sum([1,2,3], [])
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-4-6a41ffa2f670> in <module>
----> 1 sum([1,2,3], [])

TypeError: can only concatenate list (not "int") to list

quasiben avatar Mar 02 '20 02:03 quasiben

Yeah because that translates to [] + 1 + 2 + 3.

The previous case translates to [] + [1] + [2, 3] + [4].

jakirkham avatar Mar 02 '20 02:03 jakirkham

If it helps, we could also imagine sum() being written like so.

def sum(iterable, start=0):
    result = start
    for each in iterable:
        result += each
    return result

Edit: IOW if we want start=[], we need each to always be [...],          which means that iterable = [[...], [...], ...].

jakirkham avatar Mar 02 '20 03:03 jakirkham

@VibhuJawa dask recently merged a PR related to rebalancing. If you have time it would be good to test: https://github.com/dask/distributed/pull/3670

quasiben avatar Apr 07 '20 15:04 quasiben

Have you had a chance to test https://github.com/dask/distributed/pull/3670 @VibhuJawa ? Is this still an issue?

pentschev avatar May 05 '20 22:05 pentschev

FWIW that change is in Distributed 2.15.0+

jakirkham avatar May 05 '20 22:05 jakirkham

Thanks for the ping @jakirkham and @pentschev . I have not tested this yet. Will test and update here accordingly.

VibhuJawa avatar May 07 '20 14:05 VibhuJawa