dask-ml
dask-ml copied to clipboard
hyperparameter optimization example not working
What happened:
I tried the example for hyperparameter optimization: https://examples.dask.org/machine-learning/hyperparam-opt.html
What you expected to happen: it to not fail
Minimal Complete Verifiable Example:
# To add a new cell, type '# %%'
# To add a new markdown cell, type '# %% [markdown]'
# %%
from distributed import Client
client = Client()
client
# %%
from sklearn.datasets import make_circles
import numpy as np
import pandas as pd
X, y = make_circles(n_samples=30_000, random_state=0, noise=0.09)
pd.DataFrame({0: X[:, 0], 1: X[:, 1], "class": y}).sample(4_000).plot.scatter(
x=0, y=1, alpha=0.2, c="class", cmap="bwr"
)
# %%
from sklearn.utils import check_random_state
rng = check_random_state(42)
random_feats = rng.uniform(-1, 1, size=(X.shape[0], 4))
X = np.hstack((X, random_feats))
X.shape
# %%
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=5_000, random_state=42)
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
scaler = StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
import numpy as np
from sklearn.neural_network import MLPClassifier
model = MLPClassifier()
params = {
"hidden_layer_sizes": [
(24, ),
(12, 12),
(6, 6, 6, 6),
(4, 4, 4, 4, 4, 4),
(12, 6, 3, 3),
],
"activation": ["relu", "logistic", "tanh"],
"alpha": np.logspace(-6, -3, num=1000), # cnts
"batch_size": [16, 32, 64, 128, 256, 512],
}
from dask_ml.model_selection import HyperbandSearchCV
# For quick response
n_examples = 4 * len(X_train)
n_params = 8
# In practice, HyperbandSearchCV is most useful for longer searches
# n_examples = 15 * len(X_train)
# n_params = 15
# %%
max_iter = n_params # number of times partial_fit will be called
chunks = n_examples // n_params # number of examples each call sees
max_iter, chunks
# %%
import dask.array as da
X_train2 = da.from_array(X_train, chunks=chunks)
y_train2 = da.from_array(y_train, chunks=chunks)
X_train2
# %%
search = HyperbandSearchCV(
model,
params,
max_iter=max_iter,
patience=True,
)
# %%
search.metadata["partial_fit_calls"]
# %%
search.fit(X_train2, y_train2, classes=[0, 1, 2, 3])
error message:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
small_header, small_payload = dumps_msgpack(msg, **compress_opts)
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
File "/home/bastian/.local/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.comm.utils - ERROR - can not serialize 'Delayed' object
Traceback (most recent call last):
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/comm/utils.py", line 32, in _to_frames
protocol.dumps(
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
small_header, small_payload = dumps_msgpack(msg, **compress_opts)
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
File "/home/bastian/.local/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/batched.py", line 93, in _background_send
nbytes = yield self.comm.write(
File "/home/bastian/.local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/comm/tcp.py", line 230, in write
frames = await to_frames(
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/comm/utils.py", line 52, in to_frames
return _to_frames()
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/comm/utils.py", line 32, in _to_frames
protocol.dumps(
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
small_header, small_payload = dumps_msgpack(msg, **compress_opts)
File "/home/bastian/.local/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
File "/home/bastian/.local/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.scheduler - CRITICAL - Tried writing to closed comm: [{'op': 'key-in-memory', 'key': "('concatenate-a18fe0fc5cfece12af6977bd21e36f9f', 1, 0)", 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.'}]
distributed.scheduler - CRITICAL - Tried writing to closed comm: [{'op': 'key-in-memory', 'key': "('concatenate-a18fe0fc5cfece12af6977bd21e36f9f', 1, 0)", 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.'}]
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
<ipython-input-9-8272808f4a8d> in <module>
----> 1 search.fit(X_train2, y_train2, classes=[0, 1, 2, 3])
~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_incremental.py in fit(self, X, y, **fit_params)
704 client = default_client()
705 if not client.asynchronous:
--> 706 return client.sync(self._fit, X, y, **fit_params)
707 return self._fit(X, y, **fit_params)
708
~/.local/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
836 return future
837 else:
--> 838 return sync(
839 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
840 )
~/.local/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
338 if error[0]:
339 typ, exc, tb = error[0]
--> 340 raise exc.with_traceback(tb)
341 else:
342 return result[0]
~/.local/lib/python3.8/site-packages/distributed/utils.py in f()
322 if callback_timeout is not None:
323 future = asyncio.wait_for(future, callback_timeout)
--> 324 result[0] = yield future
325 except Exception as exc:
326 error[0] = sys.exc_info()
~/.local/lib/python3.8/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_hyperband.py in _fit(self, X, y, **fit_params)
399 _brackets_ids = list(reversed(sorted(SHAs)))
400
--> 401 _SHAs = await asyncio.gather(
402 *[SHAs[b]._fit(X, y, **fit_params) for b in _brackets_ids]
403 )
~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_incremental.py in _fit(self, X, y, **fit_params)
650
651 with context:
--> 652 results = await fit(
653 self.estimator,
654 self._get_params(),
~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_incremental.py in fit(model, params, X_train, y_train, X_test, y_test, additional_calls, fit_params, scorer, random_state, verbose, prefix)
464 A history of all models scores over time
465 """
--> 466 return await _fit(
467 model,
468 params,
~/.local/lib/python3.8/site-packages/dask_ml/model_selection/_incremental.py in _fit(model, params, X_train, y_train, X_test, y_test, additional_calls, fit_params, scorer, random_state, verbose, prefix)
204 assert len(X_train) == len(y_train)
205
--> 206 train_eg = await client.gather(client.map(len, y_train))
207 msg = "[CV%s] For training there are between %d and %d examples in each chunk"
208 logger.info(msg, prefix, min(train_eg), max(train_eg))
~/.local/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1828 else:
1829 raise exception.with_traceback(traceback)
-> 1830 raise exc
1831 if errors == "skip":
1832 bad_keys.add(key)
CancelledError: len-34bea58765576ad97f48e2be85678f1e
Anything else we need to know?: maybe belongs on https://github.com/dask/dask-examples/ not sure Environment:
- Dask version:
dask==2021.2.0
dask-glm==0.2.0
dask-ml==1.8.0
scikit-learn==0.24.1
distributed==2021.2.0
- Python version:python3.8.6
- Operating System: ubuntu 20.10
- Install method (conda, pip, source): pip
Thanks for the report. I can't reproduce it locally. Can you try with the dev versions of dask and distributed? It may already be fixed.
Perhaps, could be labeled with Needs Info, for the time being?
sry i forgot about this ill test it in April im atm slightly short on time