polars
polars copied to clipboard
Add streaming converters
Now that we can memory map larger than RAM ipc/feather files, it would be great if we could convert datasets in a streaming fashion.
I'd want to start with:
- parquet: This is easiest as we can iterate and read chunked (row_offset, row_len) and write/append
- csv: A chunked algorithm would have O(n^2) complexity because we need to count the rows every chunk. We can determine the bytes offsets up front (we already do that to split the threads) and then give every chunk their corresponding file bytes.
For IPC we need to figure out how to append. Ideally this would be in a single chunk per column, but I don't think that's trivial.
FYI, I was just doing the first one, to finally transfer a nearly 6 billion row dataset currently stored in an IPC file to a parquet file.
import polars as pl
file_name = "transaction_records.ipc"
nbr_rows = pl.read_ipc(file_name).height
slice_size = 100_000_000
df = pl.read_ipc(file_name, rechunk=False)
subdf = pl.concat(
[
df.slice(next_offset, slice_size)
for next_offset in range(0, nbr_rows, slice_size)
],
rechunk=False,
)
subdf.write_parquet('transaction.parquet')
subdf.estimated_size()/(2**30)
>>> subdf.estimated_size()/(2**30)
291.15183783043176
My RAM usage never went above 9 GB during the entire process, even though the dataset is 291 GB when fully loaded into RAM.
>>> import pyarrow.parquet as pq
>>> pq.read_metadata('transaction.parquet')
<pyarrow._parquet.FileMetaData object at 0x7f4528d53dd0>
created_by: Arrow2 - Native Rust implementation of Arrow
num_columns: 6
num_rows: 5969118070
num_row_groups: 139
format_version: 2.6
serialized_size: 55909
Batched CSV reader was implemented in: https://github.com/pola-rs/polars/pull/5212
@ritchie46 I am interested to contribute to this. I have been playing with reading parquet in an asynchronous way by leveraging the object-store crate. I figured out the low level async/features/tokio code and I am not wrapping my head around integrating on the executor layer.
https://github.com/pola-rs/polars/pull/5827
I think that a good way forward is:
- rename the async from that PR to streaming.
- contain all the
asynccode in parquet_streaming and let the rest of the code be single-threaded - Implement ArrowReader
What do you think?