polars icon indicating copy to clipboard operation
polars copied to clipboard

feat(rust,python): Enable object store in scan_parquet python

Open winding-lines opened this issue 2 years ago • 7 comments

This PR enables the object_store downloads in python when using scan_parquet. This is opt-in, and "object_store":True needs to be passed in the storage_options parameter.

For example

import polars
from time import time


aws_url = "s3://your-bucket/polars/datasets/foods1.parquet"
start = time()
df = polars.scan_parquet(
    aws_url,
    storage_options={
        "object_store": True,
        "region": "us-west-2",
        "aws_access_key_id": "xxxx",
        "aws_secret_access_key": "yyyy",
    },
)
print(df.head())
print(f"Time: {time() - start}")

winding-lines avatar Jan 25 '23 05:01 winding-lines

Thanks a lot for the PR @winding-lines. I hope to get to this one tomorrow. :muscle:

ritchie46 avatar Jan 26 '23 18:01 ritchie46

Sorry @ritchie46, can you wait until the weekend/next week?

The code is working but the speedup is not significant - 6 seconds faster on this 42 second test that I am using.

It may well be that the speedup will not be significant when done and that the only benefit is that we can make the planner more efficient, but I want to push a bit more.

winding-lines avatar Jan 26 '23 19:01 winding-lines

@ritchie46 here is an update on this PR:

  1. this PR is working as described in the python snippet above. I think it is worthwhile for you to review it and based on your feedback we can then merge it.
  2. the current Python implementation of scan_parquet("s3://../*.parquet") uses fsspec.open(). In this case fsspec just opens the first file and the subsequent planning phase in polars is also fast. Fast is good but for most users opening just one file is unexpected behavior.
  3. the proposed Rust implementation in this PR actually lists all the files and then fetches the file_info in the planning phase. a. In my first implementation the async code was more self-contained but this leads to serializing the downloads for the metadata. b. In my current implementation of this PR I bring together the listing and downloading phases which cuts about 6 seconds from 42 seconds on a test with 80 parquet files.
  4. This PR enables use cases with low tens of parquet files. It is not usable with hundreds of files or more which I think we should also enable in subsequent PRs.

This Dask thread https://github.com/dask/community/issues/234#issuecomment-1094887172 has some good pointers on what we could do next. The general idea is to load the metadata from a centralized system, and for a general library as polars it may make sense to support multiple mechanisms. The first one could be the Hive _common_metadata describe in the link.

There is a secondary long term design question: assuming that 1 year from now polars supports Hive, Iceberg, Delta Table, Hudi, where should all of those implementation go? My personal preference is to have them supported natively but I know that adding dependencies to Polars is also not desirable.

winding-lines avatar Jan 27 '23 13:01 winding-lines

Great work @winding-lines! Some unsolicited feedback on the function signature, what do you think of making it a little more generic e.g.

df = polars.scan_parquet(
    aws_url,
    storage_options={
        "storage_handler": "object_store",
        "region": "us-west-2",
        "aws_access_key_id": "xxxx",
        "aws_secret_access_key": "yyyy",
    },
)

or

df = polars.scan_parquet(
    aws_url,
    storage_handler="object_store",
    storage_options={
        "region": "us-west-2",
        "aws_access_key_id": "xxxx",
        "aws_secret_access_key": "yyyy",
    },
)

If the storage_handler (or maybe storage_backend) param is missing, it defaults to fsspec.

talawahtech avatar Jan 27 '23 20:01 talawahtech

The first one could be the Hive _common_metadata describe in the link.

It's not a part of the parquet spec, but systems that write directories of Parquet files often produce both _common_metadata and _metadata files. From the pyarrow docs:

Those files include information about the schema of the full dataset (for _common_metadata) and potentially all row group metadata of all files in the partitioned dataset as well (for _metadata). The actual files are metadata-only Parquet files. Note this is not a Parquet standard, but a convention set in practice by those frameworks.

This is especially helpful because you can read the _metadata file and then without any additional fetches you can do predicate pushdowns because you have the column statistics for every row group, and then you also have the exact byte ranges you want to fetch from. On that same page in the pyarrow docs they mention how to read and write.

On the Rust side, arrow2/parquet2 also have support for these _metadata files, see https://github.com/jorgecarleitao/parquet2/issues/146 and https://github.com/jorgecarleitao/arrow2/pull/1063.

kylebarron avatar Jan 28 '23 18:01 kylebarron

Current status:

  • the scan_parquet code is DONE
  • when collect-ing from python the physical_plan scan also needs to download the files, also DONE
  • MISSING: an Tokio run-time needs to be started on the collect path, working on it

winding-lines avatar Jan 29 '23 04:01 winding-lines

@ritchie46 this is ready for review. I am happy to change the API as per @talawahtech 's great feedback, if you agree with it. I am not yet familiar with the python API you desire for Polars :)

winding-lines avatar Jan 29 '23 15:01 winding-lines

I'm closing this pull request due to inactivity. Feel free to rebase and reopen and continue your work!

For reference, this is now possible in Python through fsspec. Though a native Rust implementation of Cloud integration would be welcome.

stinodego avatar Aug 10 '23 22:08 stinodego