distributed icon indicating copy to clipboard operation
distributed copied to clipboard

recursive serialization fails with nested objects

Open kevingerman opened this issue 2 years ago • 11 comments

When serializing nested more than 511 dictionaries deep, msgpack, used by distributed.protocol.core.dumps hits a hard coded recurrsion limit.

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/Users/kgerman/anaconda3/envs/daskdev/lib/python3.9/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/Users/kgerman/anaconda3/envs/daskdev/lib/python3.9/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  [Previous line repeated 508 more times]
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 163, in msgpack._cmsgpack.Packer._pack
ValueError: recursion limit exceeded.

kevingerman avatar Feb 28 '22 13:02 kevingerman

Simple reproduction:

from queue import Queue
from distributed.protocol import core

def nested_dicts( depth, breadth=1, nth=0 ):
    retval={}
    nth=breadth if nth<1 else nth
    q=Queue()
    q.put( (retval, depth, breadth, nth))
    while not q.empty():
        curr,d,b,n = q.get()
        if d>0:
            for i in range(1,b+1):
                if i/n != round(i/n):
                    curr[f"key{i}"]="bears"
                else:
                    curr[f"key{i}"]={}
                    q.put( (curr[f"key{i}"],d-1,b,n))
    return retval


def serialize_recursive_dict_throws(  ):
    core.dumps( nested_dicts( 512 ))

kevingerman avatar Feb 28 '22 13:02 kevingerman

This problem occurs when we are running larger workloads on 8 or 16 node clusters. Although it still comes up without profiling enabled, enabling dask profiling reliably recreates the issue at this scale

kevingerman avatar Feb 28 '22 16:02 kevingerman

Example stack where it occurred: https://gist.github.com/VibhuJawa/d44e5c60c9b24aa357b0c9ec6f9306e4#file-worker-log-L407

kevingerman avatar Feb 28 '22 16:02 kevingerman

It is worth noting that the exception is being thrown from msgpack because 512 is greater than the recursion limit expressed here : https://github.com/msgpack/msgpack-python/blob/main/msgpack/_packer.pyx#L50

kevingerman avatar Feb 28 '22 18:02 kevingerman

Wonder if there is a workaround by adding a new serializer when we instantiate the client for the workload.

http://distributed.dask.org/en/stable/serialization.html#extend

kevingerman avatar Mar 01 '22 17:03 kevingerman

@jakirkham @quasiben any suggestions on what to do here? Maybe someone knowledgeable of msgpack or serialization in general would have ideas on what is appropriate in such situations.

pentschev avatar Mar 02 '22 08:03 pentschev

Msgpack has a default limit of 512 -- pickle seems to work. Maybe we could detect a recursion error and back off to pickle ? @eriknw @madsbk do you have any suggestions ? My hope was that https://github.com/dask/distributed/pull/5728 would resolve things but it does not

quasiben avatar Mar 02 '22 15:03 quasiben

I was thinking about inserting a custom serializer when I create the client for tasks that I know will be expensive and/or when I want to enable profiling.

kevingerman avatar Mar 02 '22 15:03 kevingerman

In the real world use case are we sure we actually need this level of recursion and are not encountering some pathological case like a cycle?

If it is indeed needed, maybe we can just make this configurable via the dask.config

jakirkham avatar Mar 02 '22 16:03 jakirkham

We are hitting this when we run GPU-BDB queries. They are intended to be representative of real world use cases.

kevingerman avatar Mar 02 '22 16:03 kevingerman

I am running into this when putting my compute() call inside a with performance_report block on a 16 worker process (local) cluster. This is a real-world use case, not a pathological case.

Andreas5739738 avatar Sep 08 '22 14:09 Andreas5739738

@Andreas5739738 sorry for the slow reply (was out for a while), could you please share more about your use case? Ideally a simple reproducer that captures what is being tried

jakirkham avatar Oct 12 '22 20:10 jakirkham

Sorry, I did not preserve a reproduction case when I ran into this. But it seems that one was shared above by someone else. There was nothing particularly outstanding about my use case, I think (working with large DataFrames with group and apply operations on a local cluster).

Andreas5739738 avatar Oct 12 '22 20:10 Andreas5739738

I'm hitting the same msgpack recursion limit when running a moderately large computation (~200GB of data) on a remote cluster made of 2 machines with 32 workers each. The error happens when one of the workers requests a set_index task over the 200GBs dataframe, after a bunch of filtering operations. This does not seem to be a memory issue, since the cluster has a total of 2TB RAM, and the data partitions weigh ~100MB each are well distributed across all workers.

I have tried to design a minimal example of this, but when creating a client just to perform these operations everything works fine. The issue only shows up when running these computations as part of a much larger execution, even if at the point of failure the same data are being dealt with.

So, in the light of the comments in this issue, my suspicion here is that Dask is somehow trying to pack too much debug or profiling information, way over the msgpack recursion limit. I am not running under a performance_report environment.

Some (incomplete) ideas to address this issue:

  • Is there any way to completely deactivate either the usage of msgpack? I am creating the Dask client as Client(cluster_address, name=client_name_id, serializers=['pickle'], deserializers=['pickle']) but since in my program I need to run tasks from the workers themselves, do I need to specify a serializers configuration for the workers too?
  • Is there any way to completely disable all the gathering of debug/profiling information in the whole cluster? I have configured the workers with distributed.worker.profile.enabled=False, but to no avail.

albarji avatar Sep 28 '23 08:09 albarji