dask-xgboost
dask-xgboost copied to clipboard
Training data not arriving to train_part
I am getting the following error message ValueError: too many values to unpack (expected 2)
when running dask_xgboost.train
on a hadoop cluster.
Digging into the logs the shows that for one of the containers list_of_parts = ['finalize-df33507e3c937f53053b5955c9a84040', 'finalize-c424c6035f86bdbdb8fafde5a9004238', 'finalize-f62e06bceea3e45d4502b6975e94c2fc', 'finalize-4de332f518ef9fbbe910824136a231a2', 'finalize-fe3e86b7ae5aa3e16009a8f39c35bc37']
.
I call train
with a dask dataframe and by hacking the dask_xgboost.core._train
I was able to track the transformations of data
to list_of_parts
(the values of worker_map
).
After to_delayed
:
[Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 0)),
Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 1)),
Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 2)),
Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 3)),
Delayed(('split-0-0744fa3c657f0b5ea93ab842f9b006a6', 4)), ...
After zipping with the labels creating parts
:
[Delayed('tuple-f99e57e3-9f4c-4f69-90ae-9f66607901cf'),
Delayed('tuple-1734a065-e9d9-4ccd-9194-000d3cef869b'),
Delayed('tuple-87e592ee-40eb-4c3d-85cb-aae63ee23b76'),
Delayed('tuple-4dc1fd3e-a9b6-4f12-9c28-21dfb101c100'),
Delayed('tuple-2438f4d1-f6d4-451a-834b-783677c32f69'), ...
The finalize strings are generated by client.compute(parts)
.
[<Future: status: pending, key: finalize-3cf2a1ef82ada3afa320cfa353eaace1>,
<Future: status: pending, key: finalize-df33507e3c937f53053b5955c9a84040>,
<Future: status: pending, key: finalize-fed822c53ef1f53bfc48c985ffbdf728>,
<Future: status: pending, key: finalize-c424c6035f86bdbdb8fafde5a9004238>,
<Future: status: pending, key: finalize-f6793429bfe6f2520871a391e7882c1e>,
<Future: status: pending, key: finalize-56347b470161a91b696e25de5f2229b6>, ...]
After `_wait(parts):
[<Future: status: finished, type: tuple, key: finalize-3cf2a1ef82ada3afa320cfa353eaace1>,
<Future: status: finished, type: tuple, key: finalize-df33507e3c937f53053b5955c9a84040>,
<Future: status: finished, type: tuple, key: finalize-fed822c53ef1f53bfc48c985ffbdf728>,
<Future: status: finished, type: tuple, key: finalize-c424c6035f86bdbdb8fafde5a9004238>,
<Future: status: finished, type: tuple, key: finalize-f6793429bfe6f2520871a391e7882c1e>,
<Future: status: finished, type: tuple, key: finalize-56347b470161a91b696e25de5f2229b6>,
<Future: status: finished, type: tuple, key: finalize-455e539014f1526f318b5dfc2fd96b6d>, ...
who_has
:
{'finalize-3cf2a1ef82ada3afa320cfa353eaace1': ['tcp://10.195.208.244:44408'],
'finalize-df33507e3c937f53053b5955c9a84040': ['tcp://10.195.102.71:44112'],
'finalize-fed822c53ef1f53bfc48c985ffbdf728': ['tcp://10.195.208.252:35744'],
'finalize-c424c6035f86bdbdb8fafde5a9004238': ['tcp://10.195.102.71:44112'],
'finalize-f6793429bfe6f2520871a391e7882c1e': ['tcp://10.195.208.249:36605'],
'finalize-56347b470161a91b696e25de5f2229b6': ['tcp://10.195.208.249:36605'],
'finalize-455e539014f1526f318b5dfc2fd96b6d': ['tcp://10.195.208.244:44408'],
worker_map
:
defaultdict(<class 'list'>, {
'tcp://10.195.208.244:44408':
['finalize-3cf2a1ef82ada3afa320cfa353eaace1', 'finalize-455e539014f1526f318b5dfc2fd96b6d', 'finalize-4921531869b649367471b87f49589a90', 'finalize-4d457790610b74dd1fd2378a4b6e8f45', 'finalize-0b2ac9d0c0ba6e950836eed3cf0d1793'],
'tcp://10.195.102.71:44112':
['finalize-df33507e3c937f53053b5955c9a84040', 'finalize-c424c6035f86bdbdb8fafde5a9004238', 'finalize-f62e06bceea3e45d4502b6975e94c2fc', 'finalize-4de332f518ef9fbbe910824136a231a2', 'finalize-fe3e86b7ae5aa3e16009a8f39c35bc37'], ...
I am getting the following error message ValueError: too many values to unpack (expected 2) when running dask_xgboost.train on a hadoop cluster.
Often errors like this are caused by a version mismatch between client and worker. Can you verify that this is not the case?
The local env (dask2):
(dask2) [jlord:dask-xgboost-demo]$ conda list dask
# packages in environment at /nas/isg_prodops_work/jlord/conda/envs/dask2:
#
dask 0.15.4 py_0 conda-forge
dask-core 0.15.4 py_0 conda-forge
dask-glm 0.1.0 0 conda-forge
dask-ml 0.3.1 py36_0 conda-forge
dask-searchcv 0.1.0 py_0 conda-forge
dask-xgboost 0.1.3 py36_0 conda-forge
(dask2) [jlord:dask-xgboost-demo]$ conda list distributed
# packages in environment at /nas/isg_prodops_work/jlord/conda/envs/dask2:
#
distributed 1.19.3 py36_0 conda-forge
The cluster env (dask-env2):
(dask-env2) [jlord:~]$ conda list dask
# packages in environment at /nas/isg_prodops_work/jlord/conda/envs/dask-env2:
#
dask 0.15.4 py_0 conda-forge
dask-core 0.15.4 py_0 conda-forge
dask-glm 0.1.0 0 conda-forge
dask-ml 0.3.1 py36_0 conda-forge
dask-searchcv 0.1.0 py_0 conda-forge
dask-xgboost 0.1.3 py36_0 conda-forge
(dask-env2) [jlord:~]$ conda list distributed
# packages in environment at /nas/isg_prodops_work/jlord/conda/envs/dask-env2:
#
distributed 1.19.3 py36_0 conda-forge
How about e.g. cloudpickle?
You can also try client.get_versions(check=True)
.
Both environments were created from conda-forge
at the same time. The only difference from standard conda forge is a minor change to libhdfs3
(both environments have the same version) to allow it to read files governed by ACLs and the local environment installing knit
from the current master branch (the cluster environment does not have knit
installed).
client.get_versions(check=True)
returns
{'client': {'host': [('python', '3.6.3.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '2.6.32-696.10.2.el6.x86_64'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'None'),
('LANG', 'en_US.UTF-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': {'optional': [('numpy', '1.13.3'),
('pandas', '0.21.0'),
('bokeh', '0.12.10'),
('lz4', None),
('blosc', None)],
'required': [('dask', '0.15.4'),
('distributed', '1.19.3'),
('msgpack', '0.4.8'),
('cloudpickle', '0.4.0'),
('tornado', '4.5.2'),
('toolz', '0.8.2')]}},
'scheduler': {'host': [['python', '3.6.3.final.0'],
['python-bits', 64],
['OS', 'Linux'],
['OS-release', '2.6.32-696.10.2.el6.x86_64'],
['machine', 'x86_64'],
['processor', 'x86_64'],
['byteorder', 'little'],
['LC_ALL', 'None'],
['LANG', 'en_US.UTF-8'],
['LOCALE', 'en_US.UTF-8']],
'packages': {'optional': [['numpy', '1.13.3'],
['pandas', '0.21.0'],
['bokeh', '0.12.10'],
['lz4', None],
['blosc', None]],
'required': [['dask', '0.15.4'],
['distributed', '1.19.3'],
['msgpack', '0.4.8'],
['cloudpickle', '0.4.0'],
['tornado', '4.5.2'],
['toolz', '0.8.2']]}},
'workers': {}}
The result above is odd in that no workers are present.
I am getting the following error message ValueError: too many values to unpack (expected 2) when running dask_xgboost.train on a hadoop cluster.
Can you include the traceback if available?
I've got the same issue using distributed Client:
from dask.distributed import Client
import dask.dataframe as ddf
import dask_xgboost as dxgb
df = ddf.read_csv('data/aa1000.csv', 200000, sep='\t', header=None)
labels = df[0]
train_df = df[range(1, 40)]
params = {'objective': 'binary:logistic'}
booster = dxgb.train(c, params, train_df, labels)
c = Client('tcp://localhost:8786')
print(booster)
The example values of the list_of_params
variable are:
['finalize-664097bb81b25530c0890be0db2586ea', 'finalize-eec97d4ab47ff825fff9a10ad9fd16bd', 'finalize-35015c0cdec081a14f977a2b3781f07b']
or
['finalize-be434318e4abed6e42900b1c77ab6d51', 'finalize-8f351bcff2ff548e43fb47525bcd80ce']
So after applying zip(*list_of_params)
we get something like this (for length 3):
[('f', 'f', 'f'),
('i', 'i', 'i'),
('n', 'n', 'n'),
('a', 'a', 'a'),
('l', 'l', 'l'),
('i', 'i', 'i'),
('z', 'z', 'z'),
('e', 'e', 'e'),
('-', '-', '-'),
('6', 'e', '3'),
('6', 'e', '5'),
...
('5', 'd', '1'),
('8', '1', 'f'),
('6', '6', '0'),
('e', 'b', '7'),
('a', 'd', 'b')]
And it doesn't seem to be anything one could expect;)
@dmironiuk it sounds like you might actually have a reproducible test case. Are you able to produce something that others can try to get the same failure?
Sure
Starting from the begining:
$ pip freeze | grep dask
dask==0.16.0
dask-xgboost==0.1.5
I'm running a scheduler and a worker in a standard way but just for completeness:
dask-scheduler
dask-worker --memory-limit 0.8 localhost:8786
And train_xgb.py
to run:
from dask.distributed import Client
import dask.dataframe as ddf
import dask_xgboost as dxgb
import numpy as np
c = Client('tcp://localhost:8786')
df = ddf.read_csv('data/aa1000.txt', 200000, sep='\t', header=None)
num_df = df.select_dtypes(include=[np.number])
num_df = num_df.fillna(-1)
cat_df = df.select_dtypes(include=['object'])
cat_df = cat_df.fillna('OTHER').applymap(hash)
df = ddf.concat([num_df, cat_df], axis=1)
labels = df[0]
train_df = df[range(1, 40)]
params = {'objective': 'binary:logistic'}
booster = dxgb.train(c, params, train_df, labels)
print(booster)
And in the attachment you'll find the CSV file I use (I've change its extension to .txt
to be able to upload it).
aa1000.txt
When doing this myself I ran into two errors:
-
df[range(...)]
fails because it isn't a list in Python 3 (I suspect that you're still on Python 2) - I ran into a problem with
feature_names
cc @TomAugspurger and @mrphilroth
File "/home/mrocklin/workspace/distributed/distributed/client.py", line 1297, in _gather
traceback)
File "/home/mrocklin/anaconda/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/home/mrocklin/workspace/dask-xgboost/dask_xgboost/core.py", line 71, in train_part
dtrain = xgb.DMatrix(data, labels, **dmatrix_kwargs)
File "/home/mrocklin/anaconda/lib/python3.6/site-packages/xgboost/core.py", line 280, in __init__
self.feature_names = feature_names
File "/home/mrocklin/anaconda/lib/python3.6/site-packages/xgboost/core.py", line 579, in feature_names
raise ValueError('feature_names may not contain [, ] or <')
ValueError: feature_names may not contain [, ] or <
Removing the feature_names
addition made things run fine. I suspect that this is due to some churn with xgboost.
Unfortunately, I wasn't able to reproduce your issue @dmironiuk .
Opened https://github.com/dask/dask-xgboost/issues/15 for the feature names issue.
Full traceback
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-23-208a9e2b2958> in <module>()
4 'nround': 10
5 }
----> 6 model = dxgb.train(client, params, data_train, target_train)
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/dask_xgboost/core.py in train(client, params, data, labels, **kwargs)
183 predict
184 """
--> 185 return sync(client.loop, _train, client, params, data, labels, **kwargs)
186
187
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
242 e.wait(1000000)
243 if error[0]:
--> 244 six.reraise(*error[0])
245 else:
246 return result[0]
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/distributed/utils.py in f()
230 yield gen.moment
231 thread_state.asynchronous = True
--> 232 result[0] = yield make_coro()
233 except Exception as exc:
234 logger.exception(exc)
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/gen.py in run(self)
1061 if exc_info is not None:
1062 try:
-> 1063 yielded = self.gen.throw(*exc_info)
1064 finally:
1065 # Break up a reference to itself
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/dask_xgboost/core.py in _train(client, params, data, labels, **kwargs)
149
150 # Get the results, only one will be non-None
--> 151 results = yield client._gather(futures)
152 result = [v for v in results if v][0]
153 raise gen.Return(result)
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/tornado/gen.py in run(self)
1061 if exc_info is not None:
1062 try:
-> 1063 yielded = self.gen.throw(*exc_info)
1064 finally:
1065 # Break up a reference to itself
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1265 six.reraise(type(exception),
1266 exception,
-> 1267 traceback)
1268 if errors == 'skip':
1269 bad_keys.add(key)
/nas/isg_prodops_work/jlord/conda/envs/dask2/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.__traceback__ is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:
/hadoop04/yarn/nm/usercache/jlord/appcache/application_1512324565443_4720/container_e122_1512324565443_4720_01_000003/dask-env2.zip/dask-env2/lib/python3.6/site-packages/dask_xgboost/core.py in train_part()
ValueError: too many values to unpack (expected 2)
and a re-run of client.get_versions(check=True)
after the workers are created (it appears that they aren't created before any action):
{'client': {'host': [('python', '3.6.3.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '2.6.32-696.13.2.el6.x86_64'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'None'),
('LANG', 'en_US.UTF-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': {'optional': [('numpy', '1.13.3'),
('pandas', '0.21.0'),
('bokeh', '0.12.10'),
('lz4', None),
('blosc', None)],
'required': [('dask', '0.15.4'),
('distributed', '1.19.3'),
('msgpack', '0.4.8'),
('cloudpickle', '0.4.0'),
('tornado', '4.5.2'),
('toolz', '0.8.2')]}},
'scheduler': {'host': [['python', '3.6.3.final.0'],
['python-bits', 64],
['OS', 'Linux'],
['OS-release', '2.6.32-696.13.2.el6.x86_64'],
['machine', 'x86_64'],
['processor', 'x86_64'],
['byteorder', 'little'],
['LC_ALL', 'None'],
['LANG', 'en_US.UTF-8'],
['LOCALE', 'en_US.UTF-8']],
'packages': {'optional': [['numpy', '1.13.3'],
['pandas', '0.21.0'],
['bokeh', '0.12.10'],
['lz4', None],
['blosc', None]],
'required': [['dask', '0.15.4'],
['distributed', '1.19.3'],
['msgpack', '0.4.8'],
['cloudpickle', '0.4.0'],
['tornado', '4.5.2'],
['toolz', '0.8.2']]}},
'workers': {'tcp://10.195.115.22:46137': {'host': [('python',
'3.6.3.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '2.6.32-696.13.2.el6.x86_64'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'en_US.utf-8'),
('LANG', 'en_US.utf-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': {'optional': [('numpy', '1.13.3'),
('pandas', '0.21.0'),
('bokeh', '0.12.10'),
('lz4', None),
('blosc', None)],
'required': [('dask', '0.15.4'),
('distributed', '1.19.3'),
('msgpack', '0.4.8'),
('cloudpickle', '0.4.0'),
('tornado', '4.5.2'),
('toolz', '0.8.2')]}},
'tcp://10.195.208.121:46826': {'host': [('python', '3.6.3.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '2.6.32-696.13.2.el6.x86_64'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'en_US.utf-8'),
('LANG', 'en_US.utf-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': {'optional': [('numpy', '1.13.3'),
('pandas', '0.21.0'),
('bokeh', '0.12.10'),
('lz4', None),
('blosc', None)],
'required': [('dask', '0.15.4'),
('distributed', '1.19.3'),
('msgpack', '0.4.8'),
('cloudpickle', '0.4.0'),
('tornado', '4.5.2'),
('toolz', '0.8.2')]}},
'tcp://10.195.208.176:46073': {'host': [('python', '3.6.3.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '2.6.32-696.13.2.el6.x86_64'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'en_US.utf-8'),
('LANG', 'en_US.utf-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': {'optional': [('numpy', '1.13.3'),
('pandas', '0.21.0'),
('bokeh', '0.12.10'),
('lz4', None),
('blosc', None)],
'required': [('dask', '0.15.4'),
('distributed', '1.19.3'),
('msgpack', '0.4.8'),
('cloudpickle', '0.4.0'),
('tornado', '4.5.2'),
('toolz', '0.8.2')]}},
'tcp://10.195.208.239:34073': {'host': [('python', '3.6.3.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '2.6.32-696.13.2.el6.x86_64'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'en_US.utf-8'),
('LANG', 'en_US.utf-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': {'optional': [('numpy', '1.13.3'),
('pandas', '0.21.0'),
('bokeh', '0.12.10'),
('lz4', None),
('blosc', None)],
'required': [('dask', '0.15.4'),
('distributed', '1.19.3'),
('msgpack', '0.4.8'),
('cloudpickle', '0.4.0'),
('tornado', '4.5.2'),
('toolz', '0.8.2')]}}}}
I started a new environment with dask 0.16.0 and distributed 1.20.1 and everything works, so I no longer need to keep this issue open, but I am happy to help provide more info if you want to debug the problem.