polars
polars copied to clipboard
Support streaming from non-local files.
Problem description
When I try to process huge file from S3 in a streaming way I get
sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()
Minimal reproducible example is:
import polars as pl
df = pl.scan_parquet("s3://rimrep-data-public/091-aims-sst/test-50-64-spatialpart/part.0.parquet")
df.sink_parquet("part.0.parquet")
I wish that streaming processing would work with remote files because that is exactly when you have to deal with huge data.
Yes, we got that on the radar. First thing we want to do is add streaming support for scan_pyarrow_dataset
.
Also relevant for this issue: https://github.com/fsspec/filesystem_spec/blob/master/fsspec/parquet.py#L17
Specifically function fsspec.parquet.open_parquet_file
It seems that some time ago Nvidia introduced this functionality https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/
From the surface it seems to be compatible with functionality required for LazyParquetReader
. I imagine it might be possible to do similar magic on python side and populate LogicalPlan with reading chunks of data through fsspec.
@elephantum perhaps this article can also do the work till @ritchie46 completes the work.
Btw @ritchie46 such a great library! Thanks a lot! Groetjes, B
Maybe a bit of an add, but would help if the feature wasn't just limited to string interpretation and we had the ability to pass in an opened buffer to a file or in memory data source like:
stream = io.Bytes()
pl.scan_parquet(stream)
# or
with open('file.parquet', 'rb') as f:
pl.scan_parquet(f)
# or because s3fs is awesome...
with s3.open('s3://bucket/etc/etc/', 'rb') as f:
pl.scan_parquet(f)
# or finally like with a Flask request stream
pl.read_parquet(request.stream)
Talk about the IO and Memory savings... that would be impressive.
I mean yes we have tools like s3fs-fuse where I can read straight from S3 using a scan, but still.
@jackmead515 : this is already possible for scanning. As long as it is a python byte-stream, you can do that.
Please do correct me if I'm wrong, but I'm using that in production already ;-). I would very much like the sink_*
methods to do the same, but that -alas- ain't possible yet.
@svaningelgem No kidding? I just reread the documentation. I guess read_parquet() actually does define:
string, Path, BinaryIO, BytesIO, bytes
In the function... But since this isn't lazy, I suspect that it's still going to try to read all the data during that function call. I was assuming that the scan_
function calls are purely lazy and will significantly reduce memory because it will take it in chunks.
FWIW, the original issue reported by @elephantum seems to no longer be a problem:
import polars as pl
df = pl.scan_parquet('s3://<my bucket>/<my key that is a parquet file>')
df.sink_parquet('test.parquet')
pl.read_parquet('test.parquet')
shows me the dataframe I expect.
This is using polars 0.20.5
@cdmoye it's a bit vague but there's still no support to sink to remote objectstores only read from them.