polars icon indicating copy to clipboard operation
polars copied to clipboard

Support streaming from non-local files.

Open elephantum opened this issue 1 year ago • 8 comments

Problem description

When I try to process huge file from S3 in a streaming way I get

sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()

Minimal reproducible example is:

import polars as pl
df = pl.scan_parquet("s3://rimrep-data-public/091-aims-sst/test-50-64-spatialpart/part.0.parquet")
df.sink_parquet("part.0.parquet")

I wish that streaming processing would work with remote files because that is exactly when you have to deal with huge data.

elephantum avatar Jun 12 '23 14:06 elephantum

Yes, we got that on the radar. First thing we want to do is add streaming support for scan_pyarrow_dataset.

ritchie46 avatar Jun 12 '23 17:06 ritchie46

Also relevant for this issue: https://github.com/fsspec/filesystem_spec/blob/master/fsspec/parquet.py#L17 Specifically function fsspec.parquet.open_parquet_file

It seems that some time ago Nvidia introduced this functionality https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/

From the surface it seems to be compatible with functionality required for LazyParquetReader. I imagine it might be possible to do similar magic on python side and populate LogicalPlan with reading chunks of data through fsspec.

elephantum avatar Jun 13 '23 06:06 elephantum

@elephantum perhaps this article can also do the work till @ritchie46 completes the work.

Btw @ritchie46 such a great library! Thanks a lot! Groetjes, B

bcambel avatar Jun 20 '23 15:06 bcambel

Maybe a bit of an add, but would help if the feature wasn't just limited to string interpretation and we had the ability to pass in an opened buffer to a file or in memory data source like:

stream = io.Bytes()
pl.scan_parquet(stream)

# or

with open('file.parquet', 'rb') as f:
   pl.scan_parquet(f)

# or because s3fs is awesome...

with s3.open('s3://bucket/etc/etc/', 'rb') as f:
   pl.scan_parquet(f)

# or finally like with a Flask request stream

pl.read_parquet(request.stream)

Talk about the IO and Memory savings... that would be impressive.

I mean yes we have tools like s3fs-fuse where I can read straight from S3 using a scan, but still.

jackmead515 avatar Sep 05 '23 17:09 jackmead515

@jackmead515 : this is already possible for scanning. As long as it is a python byte-stream, you can do that.

Please do correct me if I'm wrong, but I'm using that in production already ;-). I would very much like the sink_* methods to do the same, but that -alas- ain't possible yet.

svaningelgem avatar Sep 05 '23 18:09 svaningelgem

@svaningelgem No kidding? I just reread the documentation. I guess read_parquet() actually does define:

string, Path, BinaryIO, BytesIO, bytes

In the function... But since this isn't lazy, I suspect that it's still going to try to read all the data during that function call. I was assuming that the scan_ function calls are purely lazy and will significantly reduce memory because it will take it in chunks.

jackmead515 avatar Sep 05 '23 18:09 jackmead515

FWIW, the original issue reported by @elephantum seems to no longer be a problem:

import polars as pl
df = pl.scan_parquet('s3://<my bucket>/<my key that is a parquet file>')
df.sink_parquet('test.parquet')
pl.read_parquet('test.parquet')

shows me the dataframe I expect.

This is using polars 0.20.5

cdmoye avatar Jan 24 '24 18:01 cdmoye

@cdmoye it's a bit vague but there's still no support to sink to remote objectstores only read from them.

deanm0000 avatar Jan 26 '24 20:01 deanm0000