polars
polars copied to clipboard
feat(rust,python): Enable object store in scan_parquet python
This PR enables the object_store downloads in python when using scan_parquet. This is opt-in, and "object_store":True needs to be passed in the storage_options parameter.
For example
import polars
from time import time
aws_url = "s3://your-bucket/polars/datasets/foods1.parquet"
start = time()
df = polars.scan_parquet(
aws_url,
storage_options={
"object_store": True,
"region": "us-west-2",
"aws_access_key_id": "xxxx",
"aws_secret_access_key": "yyyy",
},
)
print(df.head())
print(f"Time: {time() - start}")
Thanks a lot for the PR @winding-lines. I hope to get to this one tomorrow. :muscle:
Sorry @ritchie46, can you wait until the weekend/next week?
The code is working but the speedup is not significant - 6 seconds faster on this 42 second test that I am using.
It may well be that the speedup will not be significant when done and that the only benefit is that we can make the planner more efficient, but I want to push a bit more.
@ritchie46 here is an update on this PR:
- this PR is working as described in the python snippet above. I think it is worthwhile for you to review it and based on your feedback we can then merge it.
- the current Python implementation of
scan_parquet("s3://../*.parquet")usesfsspec.open(). In this casefsspecjust opens the first file and the subsequent planning phase inpolarsis also fast. Fast is good but for most users opening just one file is unexpected behavior. - the proposed Rust implementation in this PR actually lists all the files and then fetches the
file_infoin the planning phase. a. In my first implementation theasynccode was more self-contained but this leads to serializing the downloads for the metadata. b. In my current implementation of this PR I bring together the listing and downloading phases which cuts about 6 seconds from 42 seconds on a test with 80 parquet files. - This PR enables use cases with low tens of parquet files. It is not usable with hundreds of files or more which I think we should also enable in subsequent PRs.
This Dask thread https://github.com/dask/community/issues/234#issuecomment-1094887172 has some good pointers on what we could do next. The general idea is to load the metadata from a centralized system, and for a general library as polars it may make sense to support multiple mechanisms. The first one could be the Hive _common_metadata describe in the link.
There is a secondary long term design question: assuming that 1 year from now polars supports Hive, Iceberg, Delta Table, Hudi, where should all of those implementation go? My personal preference is to have them supported natively but I know that adding dependencies to Polars is also not desirable.
Great work @winding-lines! Some unsolicited feedback on the function signature, what do you think of making it a little more generic e.g.
df = polars.scan_parquet(
aws_url,
storage_options={
"storage_handler": "object_store",
"region": "us-west-2",
"aws_access_key_id": "xxxx",
"aws_secret_access_key": "yyyy",
},
)
or
df = polars.scan_parquet(
aws_url,
storage_handler="object_store",
storage_options={
"region": "us-west-2",
"aws_access_key_id": "xxxx",
"aws_secret_access_key": "yyyy",
},
)
If the storage_handler (or maybe storage_backend) param is missing, it defaults to fsspec.
The first one could be the Hive
_common_metadatadescribe in the link.
It's not a part of the parquet spec, but systems that write directories of Parquet files often produce both _common_metadata and _metadata files. From the pyarrow docs:
Those files include information about the schema of the full dataset (for
_common_metadata) and potentially all row group metadata of all files in the partitioned dataset as well (for_metadata). The actual files are metadata-only Parquet files. Note this is not a Parquet standard, but a convention set in practice by those frameworks.
This is especially helpful because you can read the _metadata file and then without any additional fetches you can do predicate pushdowns because you have the column statistics for every row group, and then you also have the exact byte ranges you want to fetch from. On that same page in the pyarrow docs they mention how to read and write.
On the Rust side, arrow2/parquet2 also have support for these _metadata files, see https://github.com/jorgecarleitao/parquet2/issues/146 and https://github.com/jorgecarleitao/arrow2/pull/1063.
Current status:
- the scan_parquet code is DONE
- when
collect-ing from python the physical_plan scan also needs to download the files, also DONE - MISSING: an Tokio run-time needs to be started on the
collectpath, working on it
@ritchie46 this is ready for review. I am happy to change the API as per @talawahtech 's great feedback, if you agree with it. I am not yet familiar with the python API you desire for Polars :)
I'm closing this pull request due to inactivity. Feel free to rebase and reopen and continue your work!
For reference, this is now possible in Python through fsspec. Though a native Rust implementation of Cloud integration would be welcome.