dask-ml
dask-ml copied to clipboard
Error trying to deserialize an object
from distributed.protocol import deserialize_bytes
What happened:
Error trying to deserialize an object dask_ml.decomposition.PCA already fitted.
OSError: Timed out trying to connect to tcp://10.xxx.120.x31:35487 after 30 s
I do a fit with my data and export a serialized file. When trying to instantiate on a different cluster, it appears that the file tries to connect with the scheduler of the cluster it was trained on.
What you expected to happen:
I expected that, regardless of the context (cluster ip, number of workers...), I would be able to import my PCA object.
I've already tried to dump using the pickle and dill libraries
deserialize_bytes('gs://file.pkl')
Can you give a reproducible example? And perhaps post the full traceback?
Just looking at the code, I don't see what attribute would be causing the issue.
As I mentioned, this error raises when I try to deserialize a dask_ml.decomposition.PCA already fitted in a different cluster.
OSError: Timed out trying to connect to tcp://10.126.121.131:35487 after 30 s
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
285 try:
--> 286 comm = await asyncio.wait_for(
287 connector.connect(loc, deserialize=deserialize, **connection_args),
~/anaconda3/envs/leroy/lib/python3.8/asyncio/tasks.py in wait_for(fut, timeout, loop)
500 await _cancel_and_wait(fut, loop=loop)
--> 501 raise exceptions.TimeoutError()
502 finally:
TimeoutError:
The above exception was the direct cause of the following exception:
OSError Traceback (most recent call last)
<ipython-input-7-4d0a81023b51> in <module>
----> 1 deserialize_bytes(bytes_)
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/serialize.py in deserialize_bytes(b)
598 header = {}
599 frames = decompress(header, frames)
--> 600 return merge_and_deserialize(header, frames)
601
602
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/serialize.py in merge_and_deserialize(header, frames, deserializers)
472 merged_frames.append(bytearray().join(frames[offset : offset + n]))
473
--> 474 return deserialize(header, merged_frames, deserializers=deserializers)
475
476
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/serialize.py in deserialize(header, frames, deserializers)
404 )
405 dumps, loads, wants_context = families[name]
--> 406 return loads(header, frames)
407
408
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/serialize.py in pickle_loads(header, frames)
83 new.append(mv)
84
---> 85 return pickle.loads(x, buffers=new)
86
87
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/protocol/pickle.py in loads(x, buffers)
71 try:
72 if buffers:
---> 73 return pickle.loads(x, buffers=buffers)
74 else:
75 return pickle.loads(x)
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in __setstate__(self, state)
378 c = Client.current(allow_global=False)
379 except ValueError:
--> 380 c = get_client(address)
381 Future.__init__(self, key, c)
382 c._send_to_scheduler(
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/worker.py in get_client(address, timeout, resolve_address)
3552 return client
3553 elif address:
-> 3554 return Client(address, timeout=timeout)
3555 else:
3556 raise ValueError("No global client found and no address provided")
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
762 ext(self)
763
--> 764 self.start(timeout=timeout)
765 Client._instances.add(self)
766
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in start(self, **kwargs)
1008 self._started = asyncio.ensure_future(self._start(**kwargs))
1009 else:
-> 1010 sync(self.loop, self._start, **kwargs)
1011
1012 def __await__(self):
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
324 if error[0]:
325 typ, exc, tb = error[0]
--> 326 raise exc.with_traceback(tb)
327 else:
328 return result[0]
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/utils.py in f()
307 if callback_timeout is not None:
308 future = asyncio.wait_for(future, callback_timeout)
--> 309 result[0] = yield future
310 except Exception:
311 error[0] = sys.exc_info()
~/anaconda3/envs/leroy/lib/python3.8/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
1098
1099 try:
-> 1100 await self._ensure_connected(timeout=timeout)
1101 except (OSError, ImportError):
1102 await self._close()
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/client.py in _ensure_connected(self, timeout)
1155
1156 try:
-> 1157 comm = await connect(
1158 self.scheduler.address, timeout=timeout, **self.connection_args
1159 )
~/anaconda3/envs/leroy/lib/python3.8/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
308 await asyncio.sleep(backoff)
309 else:
--> 310 raise OSError(
311 f"Timed out trying to connect to {addr} after {timeout} s"
312 ) from active_exception
OSError: Timed out trying to connect to tcp://10.126.121.131:35487 after 30 s
Update:
dask_ml.decomposition.PCA().mean_ is a dask.array so, I serialized componentes_ and mean_ as np.array.
Then, in another cluster I instantiate a new object and adjust these attributes.
This is a workaround but apparently, it worked.
Thanks for the update. I don’t see any reason why mean_ should be a Dask array. It should probably be set to the concrete value with the rest in https://github.com/dask/dask-ml/blob/980b3cb84e65f5508004fa1cd767d2c1122bc581/dask_ml/decomposition/pca.py#L291-L304 https://github.com/dask/dask-ml/blob/980b3cb84e65f5508004fa1cd767d2c1122bc581/dask_ml/decomposition/pca.py#L291-L304.
Are you interested in making a PR with tests to fix that?
On Sep 17, 2021, at 2:43 PM, Lucas A. Bonini @.***> wrote:
Update: dask_ml.decomposition.PCA().mean_ is a dask.array so, I serialized componentes_ and mean_ as np.array. Then, in another cluster I instantiate a new object and adjust these attributes. This is a workaround but apparently, it worked.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ml/issues/855#issuecomment-922038485, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAKAOIWVGEWWEUBWZBHKGG3UCOK65ANCNFSM5DZ3XAAA.
Hey @TomAugspurger, thanks for the opportunity. I will try as soon as possible, but I believe I won't be able to make the changes anytime soon.