polars icon indicating copy to clipboard operation
polars copied to clipboard

Support hive style partitioning of parquet file scans

Open OneRaynyDay opened this issue 1 year ago • 3 comments

Describe your feature request

pl.scan_parquet does a great job reading the data directly, but often times parquet files are organized in a hierarchical way. Typically these are called partitions of the data and have a constant expression column assigned to them (which doesn't exist in the parquet file itself). I don't think this needs to 100% live in polars but in order to get this to work efficiently there needs to be some primitives added (or not! maybe they already exist)

Copied from the apache.org resource:

Partitions: Each Table can have one or more partition Keys which determines how the data is stored. Partitions—apart from being storage units—also allow the user to efficiently identify the rows that satisfy a specified criteria; for example, a date_partition of type STRING and country_partition of type STRING. Each unique value of the partition keys defines a partition of the Table. For example, all "US" data from "2009-12-23" is a partition of the page_views table. Therefore, if you run analysis on only the "US" data for 2009-12-23, you can run that query only on the relevant partition of the table, thereby speeding up the analysis significantly. Note however, that just because a partition is named 2009-12-23 does not mean that it contains all or only data from that date; partitions are named after dates for convenience; it is the user's job to guarantee the relationship between partition name and data content! Partition columns are virtual columns, they are not part of the data itself but are derived on load.

What I think this means is either:

  1. We need to annotate each PARQUET SCAN data/date=.../data.parquet with a constant expression column. Something like PARQUET SCAN data/date=.../data.parquet; PROJECT N+1 COLUMNS; SELECT: None, CONSTANT_EXPRESSION: date=2009-12-23 perhaps?

Suppose you're looking at data/date=2009-12-23/data.parquet, then the actual parquet file doesn't have the date column inside, but we know by the partition that the date value is 2009-12-23 for all rows. When someone adds a filter pl.Expr like: pl.col("date") >= datetime(2009, 12, 23) on the pl.LazyFrame, a lot of the parquet files we have lazily scanned should be skipped because their date is before 2009-12-23, saving I/O and speeding up compute

  1. Alternatively, the user of this library can create a pyarrow.Dataset which also lazily scans and support partitioning, and has a partition_expression attribute equal to the pl.Expr example above. The way we currently transform a pyarrow.Dataset to a pl.LazyFrame doesn't allow us to push down the pl.Expr predicates into pyarrow space, AND requires the dataset to be converted to an in-memory pyarrow.Table:
def _scan_ds_impl(
    ds: pa.dataset.dataset, with_columns: list[str] | None
) -> pli.DataFrame:
    ...
    if not _PYARROW_AVAILABLE:
        raise ImportError(  # pragma: no cover
            "'pyarrow' is required for scanning from pyarrow datasets."
        )
    return pl.from_arrow(ds.to_table(columns=with_columns))  # type: ignore

This unfortunately means we can't push down those date inequality expressions and the dataset which could be huge may end up OOM'ing the machine. In order to get partitioning to work by inheriting pyarrow, we would need to fix both of those issues

OneRaynyDay avatar Aug 09 '22 15:08 OneRaynyDay

To add a bit more color, hive partitioning is just an example partitioning, where partition can be formally thought of as a function that takes in a key (in this case a string path) and returns a constant expression (which can be an empty expression) that is true for all the data associated with that key. There are tons of partitioning schemes out there, and where I work we have our own custom partition function (which can take in any string and output some constant expression). I think having Hive partitioning would cater to the largest population of users, but having a generic partitioning function that allows us to decorate the query plan with constant expressions in the scans(or any nodes along the query plan really) would allow everyone to define their own (and would be immediately useful for me in particular)

OneRaynyDay avatar Aug 09 '22 16:08 OneRaynyDay

Yes.. This is definitely something we want to add. Polars should be able to write partitioned datasets as well.

ritchie46 avatar Aug 09 '22 17:08 ritchie46

Stumbled over this when pl.scan_parquet() failed reading a dataset written with PySpark. Having this would be highly appreciated! :)

Hoeze avatar Sep 15 '22 16:09 Hoeze

+1 for this feature

mrendi29 avatar Oct 25 '22 13:10 mrendi29

@ritchie46

Polars should be able to write partitioned datasets as well.

Is there a way Polars could support pyarrow.parquet.write_dataset? It has partitioned writing. Right now only pyarrow.parquet.write_table is supported (which doesn't have partitioning).

danielgafni avatar Apr 18 '23 12:04 danielgafni

For the people that stumbled here looking for a reading solution, the following works for now:

import os
import polars as pl

def get_recursive_filepaths(directory):
    filepaths = []
    for root, _, files in os.walk(directory):
        for file in files:
            filepaths.append(os.path.join(root, file))
    return filepaths

directory = "/path/to/your/directory"
recursive_filepaths = get_recursive_filepaths(directory)

total_lazy_df = pl.concat([pl.scan_parquet(path) for path in recursive_filepaths]) # you could add if ".parquet" is in filepath to make sure it's a parquet file 

I did a bit of digging into what PyArrow supports in this department. On the topic of reading partitioned files: The class ParquetDataset supports reading a directory of partitioned parquet files as follows:

import pyarrow.parquet as pq

dataset = pq.ParquetDataset('dataset_name/')
table = dataset.read()

On the topic of writing partitioned files: The ParquetWriter (which is currently used by polars) is not capable of writing partitioned files. The functionality to write partitioned files seems to be in the pyarrow.parquet.write_to_dataset(). Maybe for the polars.DataFrame.write_parquet() it might be a consideration to add the keyword partition_cols, and if the keyword contains values to use the write_to_dataset() functionality?

Lastly, on the topic of sink_parquet(): This currently uses the RecordBatch functionality in the ParquetWriter that allows for incrementally writing a parquet file. Unfortunately the only way of adding a batch of data to an existing partitioned parquet file seems to be to read the existing partitioned parquet file, concat the new data to the frame, and overwriting the partitioned parquet file with the new data. This means it's impossible to "append" to existing partitioned parquet files with the freshly streamed data. This is unfortunate as parquets are often partitioned because they are too large to fit in memory as a single file. Not being able to sink into them seems to defeat that purpose...

I'm completely new to Rust, so I'm not sure I could create a PR to suggest the new write_to_dataset() functionality. I hope this bit of research into the PyArrow API can help someone to get it implemented.

PS: In pandas the implementation of writing has the following split:

        try:
            if partition_cols is not None:
                # writes to multiple files under the given path
                self.api.parquet.write_to_dataset(
                    table,
                    path_or_handle,
                    compression=compression,
                    partition_cols=partition_cols,
                    **kwargs,
                )
            else:
                # write to single output file
                self.api.parquet.write_table(
                    table, path_or_handle, compression=compression, **kwargs
                )

TNieuwdorp avatar Apr 19 '23 07:04 TNieuwdorp

  • Scanning a dataset is now possible via:

    mport polars as pl
    mport pyarrow.dataset as ds
    
    f = pl.scan_pyarrow_dataset(
       ds.dataset("my_dataset_folder")
    
    

    The cool thing is that it works well with remote filesystems like HDFS!

  • Writing a dataset is also possible (but streaming is not, AFAIK) using slices:

    rom tqdm import tqdm
    
    _rows = df.select(pl.count()).collect().item()
    hunk_size = 100_000
    _chunks = n_rows // chunk_size
    
    or idx in tqdm(range(n_chunks)): 
       slice_df = df.slice(offset=idx * chunk_size, length=chunk_size)
       slice_df.collect().write_parquet(f"my_new_dataset/n{idx}.parquet")
    

Vincent-Maladiere avatar Jun 08 '23 16:06 Vincent-Maladiere

How do you use slice with specific column you want to partition with ? 🤔

Berenger-Wooclap avatar Jun 15 '23 16:06 Berenger-Wooclap

How do you use slice with specific column you want to partition with ? :thinking:

You could either loop over the partition values and filter before the slice, then use .sink_parquet() to stream to parquet:

import os
for value in df.select(pl.col("part_col")).unique().collect()["part_col"]:
    part_df = df.filter(pl.col("part_col") == value)
    part_dir = f"dataset/part_col={value}"
    os.makedirs(part_dir)
    part_df.sink_parquet(f"{part_dir}/data.parquet")

Alternatively, if you could use pyarrow to do the partitioning for you. Using the above chunk based code:

from tqdm import tqdm
import pyarrow.parquet as pq
 
n_rows = df.select(pl.count()).collect().item()
chunk_size = 100_000
n_chunks = n_rows // chunk_size
 
for idx in tqdm(range(n_chunks)): 
    slice_df = df.slice(offset=idx * chunk_size, length=chunk_size)
    pq.write_to_dataset(slice_df.collect().to_arrow(), "dataset", partition_cols=["col1", "col2", "col3"])

This won't be streaming within chunks, but at least the to_arrow call is mostly zero-copy. Also, you might end up with weird file sizes depending on how your partition values are distributed among your chunks, but it does make it very convenient when partitioning by many columns.

qiemem avatar Jun 23 '23 14:06 qiemem

Thanks for the suggestion, @qiemem! Is the dataset readable/scannable in one shot when using your option 2?

Vincent-Maladiere avatar Jun 23 '23 14:06 Vincent-Maladiere

Yup! Actually would be the same with both methods:

df = pl.scan_pyarrow_dataset(ds.dataset("dataset", format="parquet", partitioning="hive"))

pyarrow will automatically pickup on the partitioning schema and the partitioning columns will be available same as any other column to polars. From what I can tell, partition-friendly operations (e.g. anything where your grouping by partition columns) seems pretty fast and memory friendly after that.

qiemem avatar Jun 23 '23 15:06 qiemem

Hey,

I still have issues reading directories with partitioning on S3. I get the following error: GetFileInfo() yielded path '[s3bucket uri]/__watermark=2022-01-01/00633d6d4f4648cd82a953694eb4683b.parquet', which is outside base dir '[s3bucket uri]'

Is there something I'm missing?

Local reads work but not for S3. Using s3fs. For dircetory reading I used s3.glob as a workaround. Sadly this won't work if the read should pick up the hive partitioning as column.

potzenhotz avatar Jul 04 '23 12:07 potzenhotz

closing in favor of https://github.com/pola-rs/polars/issues/10276

universalmind303 avatar Sep 08 '23 17:09 universalmind303