distributed
distributed copied to clipboard
recursive serialization fails with nested objects
When serializing nested more than 511 dictionaries deep, msgpack
, used by distributed.protocol.core.dumps
hits a hard coded recurrsion limit.
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/Users/kgerman/anaconda3/envs/daskdev/lib/python3.9/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/Users/kgerman/anaconda3/envs/daskdev/lib/python3.9/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
[Previous line repeated 508 more times]
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 163, in msgpack._cmsgpack.Packer._pack
ValueError: recursion limit exceeded.
Simple reproduction:
from queue import Queue
from distributed.protocol import core
def nested_dicts( depth, breadth=1, nth=0 ):
retval={}
nth=breadth if nth<1 else nth
q=Queue()
q.put( (retval, depth, breadth, nth))
while not q.empty():
curr,d,b,n = q.get()
if d>0:
for i in range(1,b+1):
if i/n != round(i/n):
curr[f"key{i}"]="bears"
else:
curr[f"key{i}"]={}
q.put( (curr[f"key{i}"],d-1,b,n))
return retval
def serialize_recursive_dict_throws( ):
core.dumps( nested_dicts( 512 ))
This problem occurs when we are running larger workloads on 8 or 16 node clusters. Although it still comes up without profiling enabled, enabling dask profiling reliably recreates the issue at this scale
Example stack where it occurred: https://gist.github.com/VibhuJawa/d44e5c60c9b24aa357b0c9ec6f9306e4#file-worker-log-L407
It is worth noting that the exception is being thrown from msgpack because 512 is greater than the recursion limit expressed here : https://github.com/msgpack/msgpack-python/blob/main/msgpack/_packer.pyx#L50
Wonder if there is a workaround by adding a new serializer when we instantiate the client for the workload.
http://distributed.dask.org/en/stable/serialization.html#extend
@jakirkham @quasiben any suggestions on what to do here? Maybe someone knowledgeable of msgpack or serialization in general would have ideas on what is appropriate in such situations.
Msgpack has a default limit of 512 -- pickle seems to work. Maybe we could detect a recursion error and back off to pickle ? @eriknw @madsbk do you have any suggestions ? My hope was that https://github.com/dask/distributed/pull/5728 would resolve things but it does not
I was thinking about inserting a custom serializer when I create the client for tasks that I know will be expensive and/or when I want to enable profiling.
In the real world use case are we sure we actually need this level of recursion and are not encountering some pathological case like a cycle?
If it is indeed needed, maybe we can just make this configurable via the dask.config
We are hitting this when we run GPU-BDB queries. They are intended to be representative of real world use cases.
I am running into this when putting my compute() call inside a with performance_report
block on a 16 worker process (local) cluster. This is a real-world use case, not a pathological case.
@Andreas5739738 sorry for the slow reply (was out for a while), could you please share more about your use case? Ideally a simple reproducer that captures what is being tried
Sorry, I did not preserve a reproduction case when I ran into this. But it seems that one was shared above by someone else. There was nothing particularly outstanding about my use case, I think (working with large DataFrames with group and apply operations on a local cluster).
I'm hitting the same msgpack
recursion limit when running a moderately large computation (~200GB of data) on a remote cluster made of 2 machines with 32 workers each. The error happens when one of the workers requests a set_index task over the 200GBs dataframe, after a bunch of filtering operations. This does not seem to be a memory issue, since the cluster has a total of 2TB RAM, and the data partitions weigh ~100MB each are well distributed across all workers.
I have tried to design a minimal example of this, but when creating a client just to perform these operations everything works fine. The issue only shows up when running these computations as part of a much larger execution, even if at the point of failure the same data are being dealt with.
So, in the light of the comments in this issue, my suspicion here is that Dask is somehow trying to pack too much debug or profiling information, way over the msgpack
recursion limit. I am not running under a performance_report
environment.
Some (incomplete) ideas to address this issue:
- Is there any way to completely deactivate either the usage of
msgpack
? I am creating the Dask client asClient(cluster_address, name=client_name_id, serializers=['pickle'], deserializers=['pickle'])
but since in my program I need to run tasks from the workers themselves, do I need to specify a serializers configuration for the workers too? - Is there any way to completely disable all the gathering of debug/profiling information in the whole cluster? I have configured the workers with
distributed.worker.profile.enabled=False
, but to no avail.