BentoML icon indicating copy to clipboard operation
BentoML copied to clipboard

feat(pandas): add support for serializing `pd.DataFrame` in Arrow IPC formats

Open judahrand opened this issue 1 year ago • 2 comments

What does this PR address?

This PR adds support for serializing Pandas DataFrames in both the Arrow IPC File and Streaming Formats. These formats are faster than Parquet from a serialization/deserialization perspective at the cost of a larger payload. This is a tradeoff which will depend on network bandwidth.

Additionally, it is worth noting that this PR uses the officially registered mime types for Arrow: https://www.iana.org/assignments/media-types/application/vnd.apache.arrow.file https://www.iana.org/assignments/media-types/application/vnd.apache.arrow.stream

It is a shame that BentoML doesn't use the correct mime type for Parquet - perhaps that is something to tackle in another PR: https://www.iana.org/assignments/media-types/application/vnd.apache.parquet

In [1]: import pyarrow

In [2]: import pandas as pd

In [3]: import numpy as np

In [4]: import io

In [5]: df = pd.DataFrame({'a': np.arange(1000), 'b': np.random.random(1000)})

In [6]: def serialize_stream(obj):
    ...:     sink = pyarrow.BufferOutputStream()
    ...:     batch = pyarrow.RecordBatch.from_pandas(obj, preserve_index=True)
    ...:     with pyarrow.ipc.new_stream(sink, batch.schema) as writer:
    ...:         writer.write_batch(batch)
    ...:     return sink.getvalue().to_pybytes()
    ...: 
    ...: 
    ...: def deserialize_stream(obj):
    ...:     with pyarrow.ipc.open_stream(obj) as reader:
    ...:         return reader.read_pandas()
    ...: 
    ...: 
    ...: def serialize_file(obj):
    ...:     sink = pyarrow.BufferOutputStream()
    ...:     batch = pyarrow.RecordBatch.from_pandas(obj, preserve_index=True)
    ...:     with pyarrow.ipc.new_file(sink, batch.schema) as writer:
    ...:         writer.write_batch(batch)
    ...:     return sink.getvalue().to_pybytes()
    ...: 
    ...: def deserialize_file(obj):
    ...:     with pyarrow.ipc.open_file(obj) as reader:
    ...:         return reader.read_pandas()

In [7]: %timeit deserialize_file(serialize_file(df))
483 µs ± 9.55 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

In [8]: %timeit deserialize_stream(serialize_stream(df))
479 µs ± 4.89 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

In [9]: %timeit pd.read_parquet(io.BytesIO(df.to_parquet()))
861 µs ± 95.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

Fixes #(issue)

Before submitting:

judahrand avatar Jun 06 '24 10:06 judahrand

@larme I don't think these test failures are related to my changes

judahrand avatar Jun 06 '24 11:06 judahrand

@judahrand please resolve the conflicts

frostming avatar Jun 21 '24 09:06 frostming