feat(pandas): add support for serializing `pd.DataFrame` in Arrow IPC formats
What does this PR address?
This PR adds support for serializing Pandas DataFrames in both the Arrow IPC File and Streaming Formats. These formats are faster than Parquet from a serialization/deserialization perspective at the cost of a larger payload. This is a tradeoff which will depend on network bandwidth.
Additionally, it is worth noting that this PR uses the officially registered mime types for Arrow: https://www.iana.org/assignments/media-types/application/vnd.apache.arrow.file https://www.iana.org/assignments/media-types/application/vnd.apache.arrow.stream
It is a shame that BentoML doesn't use the correct mime type for Parquet - perhaps that is something to tackle in another PR: https://www.iana.org/assignments/media-types/application/vnd.apache.parquet
In [1]: import pyarrow
In [2]: import pandas as pd
In [3]: import numpy as np
In [4]: import io
In [5]: df = pd.DataFrame({'a': np.arange(1000), 'b': np.random.random(1000)})
In [6]: def serialize_stream(obj):
...: sink = pyarrow.BufferOutputStream()
...: batch = pyarrow.RecordBatch.from_pandas(obj, preserve_index=True)
...: with pyarrow.ipc.new_stream(sink, batch.schema) as writer:
...: writer.write_batch(batch)
...: return sink.getvalue().to_pybytes()
...:
...:
...: def deserialize_stream(obj):
...: with pyarrow.ipc.open_stream(obj) as reader:
...: return reader.read_pandas()
...:
...:
...: def serialize_file(obj):
...: sink = pyarrow.BufferOutputStream()
...: batch = pyarrow.RecordBatch.from_pandas(obj, preserve_index=True)
...: with pyarrow.ipc.new_file(sink, batch.schema) as writer:
...: writer.write_batch(batch)
...: return sink.getvalue().to_pybytes()
...:
...: def deserialize_file(obj):
...: with pyarrow.ipc.open_file(obj) as reader:
...: return reader.read_pandas()
In [7]: %timeit deserialize_file(serialize_file(df))
483 µs ± 9.55 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [8]: %timeit deserialize_stream(serialize_stream(df))
479 µs ± 4.89 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [9]: %timeit pd.read_parquet(io.BytesIO(df.to_parquet()))
861 µs ± 95.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Fixes #(issue)
Before submitting:
- [x] Does the Pull Request follow Conventional Commits specification naming? Here are GitHub's guide on how to create a pull request.
- [x] Does the code follow BentoML's code style,
pre-commit run -ascript has passed (instructions)? - [x] Did you read through contribution guidelines and follow development guidelines?
- [ ] Did your changes require updates to the documentation? Have you updated those accordingly? Here are documentation guidelines and tips on writting docs.
- [ ] Did you write tests to cover your changes?
@larme I don't think these test failures are related to my changes
@judahrand please resolve the conflicts