dask icon indicating copy to clipboard operation
dask copied to clipboard

Provide `DataFrame.to_pickle`

Open multimeric opened this issue 3 years ago • 7 comments

Currently the best supported way to store the distributed dataframe to disk is using to_parquet. This is great, but there are some use cases where pickle would work better, for example when the dataframe contains native Python types that aren't easily represented in parquet. Also, the type inference for parquet isn't perfect, so often an object dtype that is e.g. clearly a list[str] gets inferred as a string and serialization fails. Pickle serialization ideally would Just Work™ in a way that parquet doesn't necessarily.

The implementation I imagine would be fairly easy. Each partition is a dataframe, and so we can just df.to_pickle each of these into a separate pickle file. In fact, it seems like other output formats that pandas already supports could be supported in dask in the same way. Is there some difficult part of this that I'm missing? I would be happy to work on this feature if it's feasible for someone like me who isn't intimately familiar with the library internals.

multimeric avatar Aug 13 '22 15:08 multimeric

Can you just convert the parquet to a pandas dataframe and then use df.to_pickle?

ba05 avatar Aug 14 '22 00:08 ba05

I want a direct dask.dataframe.DataFrame → pickle conversion, without parquet involved.

multimeric avatar Aug 14 '22 03:08 multimeric

Thanks for the issue @multimeric! Adding a to_pickle method seems in scope. That said if you're experiencing issues with Dask's use of Parquet, I'd be interested in hearing more about them or (if you're interested) opening new issues. Always interested in how we can improve Dask's Parquet experience for users

jrbourbeau avatar Aug 15 '22 21:08 jrbourbeau

Okay so my main issue with parquet at the moment is in serializing dicts and lists. Here's a very simple example:

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({"a": [[1, 2], [2, 3], [4, 5], [6, 7]], "b": [{"key": "value"}] * 4})
ddf = dd.from_pandas(
    df,
    npartitions=2,
)
ddf.to_parquet("save")
ValueError: Failed to convert partition to expected pyarrow schema:
    `ArrowTypeError("Expected bytes, got a 'list' object", 'Conversion failed for column a with type object')`

Expected partition schema:
    a: string
    b: string
    __null_dask_index__: int64

Received partition schema:
    a: list<item: int64>
      child 0, item: int64
    b: struct<key: string>
      child 0, key: string
    __null_dask_index__: int64

So it's true that, with improved type inference we could avoid this error (not sure if the type inference lives in dask, or in pyarrow though). However, I believe Parquet's lists and dicts are strongly typed, so if you have a data frame with several thousand rows, you would need to check every item in every list, in order to determine what type the list is. Then if you find out that it's a mixed-type list, I'm not sure what you could even do. So I understand why to_parquet() requires a schema in the way that it does. This is my motivation for to_pickle. It still lets you serialize the data frame, but it's the easy version which doesn't require and schema customization, at the cost of being only Python compatible (unlike Parquet which is pretty easy to read into any language).

multimeric avatar Aug 15 '22 23:08 multimeric

Dask makes the (often, but not always, accurate) assumption that an "object" dtype is a string when writing parquet datasets. If you are doing tricky things like serializing dicts and lists, then you will need to specify a pyarrow schema (see the brief discussion under the schema argument in to_parquet).

If you have a strict shape for these structures, you can try to provide a schema manually to describe them:

import pyarrow as pa

schema = pa.schema(
    {
        "a": pa.list_(pa.int32()),
        "b": pa.struct([(pa.field("key", pa.string()))]),
    }
)

ddf.to_parquet("tmp.parquet", schema=schema)
dd.read_parquet("tmp.parquet").compute()

This, however, will fail if your data has lists/dicts that are less structured than the above. In that case, I might suggest a preprocessing step dumping them to something like JSON:

import json

ddf2 = ddf.applymap(json.dumps)

ddf2.to_parquet("tmp.parquet", schema={"a": pa.binary(), "b": pa.binary()})
dd.read_parquet("tmp.parquet").applymap(json.loads, meta="object").compute()

(Note: I think that parquet has a JSON dtype, but it wasn't immediately clear to me whether this is possible with pyarrow -- perhaps we could figure it out with more searching)

If you really want pickling, a similar trick might be done with pickle:

import pickle

ddf2 = ddf.applymap(pickle.dumps)

ddf2.to_parquet("tmp.parquet", schema={"a": pa.binary(), "b": pa.binary()})
dd.read_parquet("tmp.parquet").applymap(pickle.loads, meta="object").compute()

This is my motivation for to_pickle. It still lets you serialize the data frame, but it's the easy version which doesn't require and schema customization, at the cost of being only Python compatible (unlike Parquet which is pretty easy to read into any language).

I can see how writing all of the above are a bit more work than just writing to_pickle(), so I won't push back too hard against the idea, but to me the benefits of parquet make the extra work of dealing with object dtypes somewhat worth it.

ian-r-rose avatar Aug 16 '22 00:08 ian-r-rose

I should also note: if you want to just let pyarrow take a crack at serializing things, throwing schema-matching caution to the wind, you can pass to_parquet(schema=None)

ian-r-rose avatar Aug 16 '22 00:08 ian-r-rose

I think there are some meaningful downsides with these workarounds, though.

Manually specifying a schema works well when you only have two columns like in my example, but my real life use case is loading in a massive data frame with hundreds of columns from JSON, at which point curating the schema is a huge undertaking that I wouldn't want to burden my users with.

I can see the appeal of mapping to JSON strings, but this can't represent any arbitrary python type, although you are right that it works for lists and dicts as in my example.

Pickling each element is closer to what I'm after, because it will always succeed, but I suspect it will be less optimized than pickling the entire data frame, and it has the downside that no one function can load the data frame into a "final", usable data frame, because you need to apply your specific knowledge that each element needs to be unpickled first. If someone sees your .parquet file and tries to load it with any standard tool they would just see binary blobs in each cell of the table which is not intuitive.

Pickling each partition still strikes me as the easiest solution. Users can then just from_pickle to load the partitions back into memory, and it will be usable straight away, with all the data in the right format. They can even read individual partitions using pandas from_pickle, which I find is a great tool for debugging.

multimeric avatar Aug 16 '22 03:08 multimeric