dask-xgboost icon indicating copy to clipboard operation
dask-xgboost copied to clipboard

Training data not arriving to train_part

Open quartox opened this issue 7 years ago • 11 comments

I am getting the following error message ValueError: too many values to unpack (expected 2) when running dask_xgboost.train on a hadoop cluster.

Digging into the logs the shows that for one of the containers list_of_parts = ['finalize-df33507e3c937f53053b5955c9a84040', 'finalize-c424c6035f86bdbdb8fafde5a9004238', 'finalize-f62e06bceea3e45d4502b6975e94c2fc', 'finalize-4de332f518ef9fbbe910824136a231a2', 'finalize-fe3e86b7ae5aa3e16009a8f39c35bc37'].

I call train with a dask dataframe and by hacking the dask_xgboost.core._train I was able to track the transformations of data to list_of_parts (the values of worker_map).

After to_delayed:

[Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 0)), 
Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 1)), 
Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 2)), 
Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 3)), 
Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 4)), ...

After zipping with the labels creating parts:

[Delayed('tuple-f99e57e3-9f4c-4f69-90ae-9f66607901cf'), 
Delayed('tuple-1734a065-e9d9-4ccd-9194-000d3cef869b'), 
Delayed('tuple-87e592ee-40eb-4c3d-85cb-aae63ee23b76'), 
Delayed('tuple-4dc1fd3e-a9b6-4f12-9c28-21dfb101c100'), 
Delayed('tuple-2438f4d1-f6d4-451a-834b-783677c32f69'), ...

The finalize strings are generated by client.compute(parts).

[<Future: status: pending, key: finalize-3cf2a1ef82ada3afa320cfa353eaace1>, 
<Future: status: pending, key: finalize-df33507e3c937f53053b5955c9a84040>, 
<Future: status: pending, key: finalize-fed822c53ef1f53bfc48c985ffbdf728>, 
<Future: status: pending, key: finalize-c424c6035f86bdbdb8fafde5a9004238>, 
<Future: status: pending, key: finalize-f6793429bfe6f2520871a391e7882c1e>, 
<Future: status: pending, key: finalize-56347b470161a91b696e25de5f2229b6>, ...]

After `_wait(parts):

[<Future: status: finished, type: tuple, key: finalize-3cf2a1ef82ada3afa320cfa353eaace1>, 
<Future: status: finished, type: tuple, key: finalize-df33507e3c937f53053b5955c9a84040>, 
<Future: status: finished, type: tuple, key: finalize-fed822c53ef1f53bfc48c985ffbdf728>, 
<Future: status: finished, type: tuple, key: finalize-c424c6035f86bdbdb8fafde5a9004238>, 
<Future: status: finished, type: tuple, key: finalize-f6793429bfe6f2520871a391e7882c1e>, 
<Future: status: finished, type: tuple, key: finalize-56347b470161a91b696e25de5f2229b6>, 
<Future: status: finished, type: tuple, key: finalize-455e539014f1526f318b5dfc2fd96b6d>, ...

who_has:

{'finalize-3cf2a1ef82ada3afa320cfa353eaace1': ['tcp://10.195.208.244:44408'], 
'finalize-df33507e3c937f53053b5955c9a84040': ['tcp://10.195.102.71:44112'], 
'finalize-fed822c53ef1f53bfc48c985ffbdf728': ['tcp://10.195.208.252:35744'], 
'finalize-c424c6035f86bdbdb8fafde5a9004238': ['tcp://10.195.102.71:44112'], 
'finalize-f6793429bfe6f2520871a391e7882c1e': ['tcp://10.195.208.249:36605'], 
'finalize-56347b470161a91b696e25de5f2229b6': ['tcp://10.195.208.249:36605'], 
'finalize-455e539014f1526f318b5dfc2fd96b6d': ['tcp://10.195.208.244:44408'],

worker_map:

defaultdict(<class 'list'>, {
'tcp://10.195.208.244:44408': 
['finalize-3cf2a1ef82ada3afa320cfa353eaace1', 'finalize-455e539014f1526f318b5dfc2fd96b6d', 'finalize-4921531869b649367471b87f49589a90', 'finalize-4d457790610b74dd1fd2378a4b6e8f45', 'finalize-0b2ac9d0c0ba6e950836eed3cf0d1793'], 
'tcp://10.195.102.71:44112': 
['finalize-df33507e3c937f53053b5955c9a84040', 'finalize-c424c6035f86bdbdb8fafde5a9004238', 'finalize-f62e06bceea3e45d4502b6975e94c2fc', 'finalize-4de332f518ef9fbbe910824136a231a2', 'finalize-fe3e86b7ae5aa3e16009a8f39c35bc37'], ...

quartox avatar Nov 17 '17 20:11 quartox

I am getting the following error message ValueError: too many values to unpack (expected 2) when running dask_xgboost.train on a hadoop cluster.

Often errors like this are caused by a version mismatch between client and worker. Can you verify that this is not the case?

mrocklin avatar Nov 17 '17 20:11 mrocklin

The local env (dask2):

(dask2) [jlord:dask-xgboost-demo]$ conda list dask
# packages in environment at /nas/isg_prodops_work/jlord/conda/envs/dask2:
#
dask                      0.15.4                     py_0    conda-forge
dask-core                 0.15.4                     py_0    conda-forge
dask-glm                  0.1.0                         0    conda-forge
dask-ml                   0.3.1                    py36_0    conda-forge
dask-searchcv             0.1.0                      py_0    conda-forge
dask-xgboost              0.1.3                    py36_0    conda-forge
(dask2) [jlord:dask-xgboost-demo]$ conda list distributed
# packages in environment at /nas/isg_prodops_work/jlord/conda/envs/dask2:
#
distributed               1.19.3                   py36_0    conda-forge

The cluster env (dask-env2):

(dask-env2) [jlord:~]$ conda list dask
# packages in environment at /nas/isg_prodops_work/jlord/conda/envs/dask-env2:
#
dask                      0.15.4                     py_0    conda-forge
dask-core                 0.15.4                     py_0    conda-forge
dask-glm                  0.1.0                         0    conda-forge
dask-ml                   0.3.1                    py36_0    conda-forge
dask-searchcv             0.1.0                      py_0    conda-forge
dask-xgboost              0.1.3                    py36_0    conda-forge
(dask-env2) [jlord:~]$ conda list distributed
# packages in environment at /nas/isg_prodops_work/jlord/conda/envs/dask-env2:
#
distributed               1.19.3                   py36_0    conda-forge

quartox avatar Nov 17 '17 21:11 quartox

How about e.g. cloudpickle?

You can also try client.get_versions(check=True).

TomAugspurger avatar Nov 17 '17 21:11 TomAugspurger

Both environments were created from conda-forge at the same time. The only difference from standard conda forge is a minor change to libhdfs3 (both environments have the same version) to allow it to read files governed by ACLs and the local environment installing knit from the current master branch (the cluster environment does not have knit installed).

client.get_versions(check=True) returns

{'client': {'host': [('python', '3.6.3.final.0'),
   ('python-bits', 64),
   ('OS', 'Linux'),
   ('OS-release', '2.6.32-696.10.2.el6.x86_64'),
   ('machine', 'x86_64'),
   ('processor', 'x86_64'),
   ('byteorder', 'little'),
   ('LC_ALL', 'None'),
   ('LANG', 'en_US.UTF-8'),
   ('LOCALE', 'en_US.UTF-8')],
  'packages': {'optional': [('numpy', '1.13.3'),
    ('pandas', '0.21.0'),
    ('bokeh', '0.12.10'),
    ('lz4', None),
    ('blosc', None)],
   'required': [('dask', '0.15.4'),
    ('distributed', '1.19.3'),
    ('msgpack', '0.4.8'),
    ('cloudpickle', '0.4.0'),
    ('tornado', '4.5.2'),
    ('toolz', '0.8.2')]}},
 'scheduler': {'host': [['python', '3.6.3.final.0'],
   ['python-bits', 64],
   ['OS', 'Linux'],
   ['OS-release', '2.6.32-696.10.2.el6.x86_64'],
   ['machine', 'x86_64'],
   ['processor', 'x86_64'],
   ['byteorder', 'little'],
   ['LC_ALL', 'None'],
   ['LANG', 'en_US.UTF-8'],
   ['LOCALE', 'en_US.UTF-8']],
  'packages': {'optional': [['numpy', '1.13.3'],
    ['pandas', '0.21.0'],
    ['bokeh', '0.12.10'],
    ['lz4', None],
    ['blosc', None]],
   'required': [['dask', '0.15.4'],
    ['distributed', '1.19.3'],
    ['msgpack', '0.4.8'],
    ['cloudpickle', '0.4.0'],
    ['tornado', '4.5.2'],
    ['toolz', '0.8.2']]}},
 'workers': {}}

quartox avatar Nov 17 '17 21:11 quartox

The result above is odd in that no workers are present.

I am getting the following error message ValueError: too many values to unpack (expected 2) when running dask_xgboost.train on a hadoop cluster.

Can you include the traceback if available?

mrocklin avatar Nov 20 '17 18:11 mrocklin

I've got the same issue using distributed Client:

from dask.distributed import Client
import dask.dataframe as ddf
import dask_xgboost as dxgb

df = ddf.read_csv('data/aa1000.csv', 200000, sep='\t', header=None)
labels = df[0]
train_df = df[range(1, 40)]

params = {'objective': 'binary:logistic'}
booster = dxgb.train(c, params, train_df, labels)
c = Client('tcp://localhost:8786')
print(booster)

The example values of the list_of_params variable are:

['finalize-664097bb81b25530c0890be0db2586ea', 'finalize-eec97d4ab47ff825fff9a10ad9fd16bd', 'finalize-35015c0cdec081a14f977a2b3781f07b']

or

['finalize-be434318e4abed6e42900b1c77ab6d51', 'finalize-8f351bcff2ff548e43fb47525bcd80ce']

So after applying zip(*list_of_params) we get something like this (for length 3):

[('f', 'f', 'f'),
 ('i', 'i', 'i'),
 ('n', 'n', 'n'),
 ('a', 'a', 'a'),
 ('l', 'l', 'l'),
 ('i', 'i', 'i'),
 ('z', 'z', 'z'),
 ('e', 'e', 'e'),
 ('-', '-', '-'),
 ('6', 'e', '3'),
 ('6', 'e', '5'),
...
 ('5', 'd', '1'),
 ('8', '1', 'f'),
 ('6', '6', '0'),
 ('e', 'b', '7'),
 ('a', 'd', 'b')]

And it doesn't seem to be anything one could expect;)

dmironiuk avatar Nov 21 '17 14:11 dmironiuk

@dmironiuk it sounds like you might actually have a reproducible test case. Are you able to produce something that others can try to get the same failure?

mrocklin avatar Nov 21 '17 15:11 mrocklin

Sure

Starting from the begining:

$ pip freeze | grep dask
dask==0.16.0
dask-xgboost==0.1.5

I'm running a scheduler and a worker in a standard way but just for completeness:

dask-scheduler
dask-worker  --memory-limit 0.8 localhost:8786

And train_xgb.py to run:

from dask.distributed import Client
import dask.dataframe as ddf
import dask_xgboost as dxgb
import numpy as np

c = Client('tcp://localhost:8786')
df = ddf.read_csv('data/aa1000.txt', 200000, sep='\t', header=None)

num_df = df.select_dtypes(include=[np.number])
num_df = num_df.fillna(-1)
cat_df = df.select_dtypes(include=['object'])
cat_df = cat_df.fillna('OTHER').applymap(hash)

df = ddf.concat([num_df, cat_df], axis=1)

labels = df[0]
train_df = df[range(1, 40)]

params = {'objective': 'binary:logistic'}
booster = dxgb.train(c, params, train_df, labels)
print(booster)

And in the attachment you'll find the CSV file I use (I've change its extension to .txt to be able to upload it). aa1000.txt

dmironiuk avatar Nov 21 '17 16:11 dmironiuk

When doing this myself I ran into two errors:

  1. df[range(...)] fails because it isn't a list in Python 3 (I suspect that you're still on Python 2)
  2. I ran into a problem with feature_names cc @TomAugspurger and @mrphilroth
  File "/home/mrocklin/workspace/distributed/distributed/client.py", line 1297, in _gather
    traceback)
  File "/home/mrocklin/anaconda/lib/python3.6/site-packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/home/mrocklin/workspace/dask-xgboost/dask_xgboost/core.py", line 71, in train_part
    dtrain = xgb.DMatrix(data, labels, **dmatrix_kwargs)
  File "/home/mrocklin/anaconda/lib/python3.6/site-packages/xgboost/core.py", line 280, in __init__
    self.feature_names = feature_names
  File "/home/mrocklin/anaconda/lib/python3.6/site-packages/xgboost/core.py", line 579, in feature_names
    raise ValueError('feature_names may not contain [, ] or <')
ValueError: feature_names may not contain [, ] or <

Removing the feature_names addition made things run fine. I suspect that this is due to some churn with xgboost.

Unfortunately, I wasn't able to reproduce your issue @dmironiuk .

mrocklin avatar Nov 21 '17 16:11 mrocklin

Opened https://github.com/dask/dask-xgboost/issues/15 for the feature names issue.

TomAugspurger avatar Nov 26 '17 20:11 TomAugspurger

Full traceback

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-23-208a9e2b2958> in <module>()
      4     'nround': 10
      5 }
----> 6 model = dxgb.train(client, params, data_train, target_train)

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/dask_xgboost/core.py in train(client, params, data, labels, **kwargs)
    183     predict
    184     """
--> 185     return sync(client.loop, _train, client, params, data, labels, **kwargs)
    186 
    187 

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    242         e.wait(1000000)
    243     if error[0]:
--> 244         six.reraise(*error[0])
    245     else:
    246         return result[0]

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/distributed/utils.py in f()
    230             yield gen.moment
    231             thread_state.asynchronous = True
--> 232             result[0] = yield make_coro()
    233         except Exception as exc:
    234             logger.exception(exc)

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/dask_xgboost/core.py in _train(client, params, data, labels, **kwargs)
    149 
    150     # Get the results, only one will be non-None
--> 151     results = yield client._gather(futures)
    152     result = [v for v in results if v][0]
    153     raise gen.Return(result)

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1265                             six.reraise(type(exception),
   1266                                         exception,
-> 1267                                         traceback)
   1268                     if errors == 'skip':
   1269                         bad_keys.add(key)

/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/hadoop04/yarn/nm/usercache/jlord/appcache/application_1512324565443_4720/container_e122_1512324565443_4720_01_000003/dask-env2.zip/dask-env2/lib/python3.6/site-packages/dask_xgboost/core.py in train_part()

ValueError: too many values to unpack (expected 2)

and a re-run of client.get_versions(check=True) after the workers are created (it appears that they aren't created before any action):

{'client': {'host': [('python', '3.6.3.final.0'),
   ('python-bits', 64),
   ('OS', 'Linux'),
   ('OS-release', '2.6.32-696.13.2.el6.x86_64'),
   ('machine', 'x86_64'),
   ('processor', 'x86_64'),
   ('byteorder', 'little'),
   ('LC_ALL', 'None'),
   ('LANG', 'en_US.UTF-8'),
   ('LOCALE', 'en_US.UTF-8')],
  'packages': {'optional': [('numpy', '1.13.3'),
    ('pandas', '0.21.0'),
    ('bokeh', '0.12.10'),
    ('lz4', None),
    ('blosc', None)],
   'required': [('dask', '0.15.4'),
    ('distributed', '1.19.3'),
    ('msgpack', '0.4.8'),
    ('cloudpickle', '0.4.0'),
    ('tornado', '4.5.2'),
    ('toolz', '0.8.2')]}},
 'scheduler': {'host': [['python', '3.6.3.final.0'],
   ['python-bits', 64],
   ['OS', 'Linux'],
   ['OS-release', '2.6.32-696.13.2.el6.x86_64'],
   ['machine', 'x86_64'],
   ['processor', 'x86_64'],
   ['byteorder', 'little'],
   ['LC_ALL', 'None'],
   ['LANG', 'en_US.UTF-8'],
   ['LOCALE', 'en_US.UTF-8']],
  'packages': {'optional': [['numpy', '1.13.3'],
    ['pandas', '0.21.0'],
    ['bokeh', '0.12.10'],
    ['lz4', None],
    ['blosc', None]],
   'required': [['dask', '0.15.4'],
    ['distributed', '1.19.3'],
    ['msgpack', '0.4.8'],
    ['cloudpickle', '0.4.0'],
    ['tornado', '4.5.2'],
    ['toolz', '0.8.2']]}},
 'workers': {'tcp://10.195.115.22:46137': {'host': [('python',
     '3.6.3.final.0'),
    ('python-bits', 64),
    ('OS', 'Linux'),
    ('OS-release', '2.6.32-696.13.2.el6.x86_64'),
    ('machine', 'x86_64'),
    ('processor', 'x86_64'),
    ('byteorder', 'little'),
    ('LC_ALL', 'en_US.utf-8'),
    ('LANG', 'en_US.utf-8'),
    ('LOCALE', 'en_US.UTF-8')],
   'packages': {'optional': [('numpy', '1.13.3'),
     ('pandas', '0.21.0'),
     ('bokeh', '0.12.10'),
     ('lz4', None),
     ('blosc', None)],
    'required': [('dask', '0.15.4'),
     ('distributed', '1.19.3'),
     ('msgpack', '0.4.8'),
     ('cloudpickle', '0.4.0'),
     ('tornado', '4.5.2'),
     ('toolz', '0.8.2')]}},
  'tcp://10.195.208.121:46826': {'host': [('python', '3.6.3.final.0'),
    ('python-bits', 64),
    ('OS', 'Linux'),
    ('OS-release', '2.6.32-696.13.2.el6.x86_64'),
    ('machine', 'x86_64'),
    ('processor', 'x86_64'),
    ('byteorder', 'little'),
    ('LC_ALL', 'en_US.utf-8'),
    ('LANG', 'en_US.utf-8'),
    ('LOCALE', 'en_US.UTF-8')],
   'packages': {'optional': [('numpy', '1.13.3'),
     ('pandas', '0.21.0'),
     ('bokeh', '0.12.10'),
     ('lz4', None),
     ('blosc', None)],
    'required': [('dask', '0.15.4'),
     ('distributed', '1.19.3'),
     ('msgpack', '0.4.8'),
     ('cloudpickle', '0.4.0'),
     ('tornado', '4.5.2'),
     ('toolz', '0.8.2')]}},
  'tcp://10.195.208.176:46073': {'host': [('python', '3.6.3.final.0'),
    ('python-bits', 64),
    ('OS', 'Linux'),
    ('OS-release', '2.6.32-696.13.2.el6.x86_64'),
    ('machine', 'x86_64'),
    ('processor', 'x86_64'),
    ('byteorder', 'little'),
    ('LC_ALL', 'en_US.utf-8'),
    ('LANG', 'en_US.utf-8'),
    ('LOCALE', 'en_US.UTF-8')],
   'packages': {'optional': [('numpy', '1.13.3'),
     ('pandas', '0.21.0'),
     ('bokeh', '0.12.10'),
     ('lz4', None),
     ('blosc', None)],
    'required': [('dask', '0.15.4'),
     ('distributed', '1.19.3'),
     ('msgpack', '0.4.8'),
     ('cloudpickle', '0.4.0'),
     ('tornado', '4.5.2'),
     ('toolz', '0.8.2')]}},
  'tcp://10.195.208.239:34073': {'host': [('python', '3.6.3.final.0'),
    ('python-bits', 64),
    ('OS', 'Linux'),
    ('OS-release', '2.6.32-696.13.2.el6.x86_64'),
    ('machine', 'x86_64'),
    ('processor', 'x86_64'),
    ('byteorder', 'little'),
    ('LC_ALL', 'en_US.utf-8'),
    ('LANG', 'en_US.utf-8'),
    ('LOCALE', 'en_US.UTF-8')],
   'packages': {'optional': [('numpy', '1.13.3'),
     ('pandas', '0.21.0'),
     ('bokeh', '0.12.10'),
     ('lz4', None),
     ('blosc', None)],
    'required': [('dask', '0.15.4'),
     ('distributed', '1.19.3'),
     ('msgpack', '0.4.8'),
     ('cloudpickle', '0.4.0'),
     ('tornado', '4.5.2'),
     ('toolz', '0.8.2')]}}}}

I started a new environment with dask 0.16.0 and distributed 1.20.1 and everything works, so I no longer need to keep this issue open, but I am happy to help provide more info if you want to debug the problem.

quartox avatar Dec 04 '17 21:12 quartox