polars
polars copied to clipboard
Support hive style partitioning of parquet file scans
Describe your feature request
pl.scan_parquet
does a great job reading the data directly, but often times parquet files are organized in a hierarchical way. Typically these are called partitions of the data and have a constant expression column assigned to them (which doesn't exist in the parquet file itself). I don't think this needs to 100% live in polars but in order to get this to work efficiently there needs to be some primitives added (or not! maybe they already exist)
Copied from the apache.org resource:
Partitions: Each Table can have one or more partition Keys which determines how the data is stored. Partitions—apart from being storage units—also allow the user to efficiently identify the rows that satisfy a specified criteria; for example, a date_partition of type STRING and country_partition of type STRING. Each unique value of the partition keys defines a partition of the Table. For example, all "US" data from "2009-12-23" is a partition of the page_views table. Therefore, if you run analysis on only the "US" data for 2009-12-23, you can run that query only on the relevant partition of the table, thereby speeding up the analysis significantly. Note however, that just because a partition is named 2009-12-23 does not mean that it contains all or only data from that date; partitions are named after dates for convenience; it is the user's job to guarantee the relationship between partition name and data content! Partition columns are virtual columns, they are not part of the data itself but are derived on load.
What I think this means is either:
- We need to annotate each
PARQUET SCAN data/date=.../data.parquet
with a constant expression column. Something likePARQUET SCAN data/date=.../data.parquet; PROJECT N+1 COLUMNS; SELECT: None, CONSTANT_EXPRESSION: date=2009-12-23
perhaps?
Suppose you're looking at data/date=2009-12-23/data.parquet
, then the actual parquet file doesn't have the date
column inside, but we know by the partition that the date value is 2009-12-23
for all rows. When someone adds a filter pl.Expr
like: pl.col("date") >= datetime(2009, 12, 23)
on the pl.LazyFrame
, a lot of the parquet files we have lazily scanned should be skipped because their date is before 2009-12-23, saving I/O and speeding up compute
- Alternatively, the user of this library can create a pyarrow.Dataset which also lazily scans and support partitioning, and has a
partition_expression
attribute equal to thepl.Expr
example above. The way we currently transform apyarrow.Dataset
to apl.LazyFrame
doesn't allow us to push down thepl.Expr
predicates into pyarrow space, AND requires the dataset to be converted to an in-memory pyarrow.Table:
def _scan_ds_impl(
ds: pa.dataset.dataset, with_columns: list[str] | None
) -> pli.DataFrame:
...
if not _PYARROW_AVAILABLE:
raise ImportError( # pragma: no cover
"'pyarrow' is required for scanning from pyarrow datasets."
)
return pl.from_arrow(ds.to_table(columns=with_columns)) # type: ignore
This unfortunately means we can't push down those date inequality expressions and the dataset which could be huge may end up OOM'ing the machine. In order to get partitioning to work by inheriting pyarrow, we would need to fix both of those issues
To add a bit more color, hive partitioning is just an example partitioning, where partition can be formally thought of as a function that takes in a key (in this case a string path) and returns a constant expression (which can be an empty expression) that is true for all the data associated with that key. There are tons of partitioning schemes out there, and where I work we have our own custom partition function (which can take in any string and output some constant expression). I think having Hive partitioning would cater to the largest population of users, but having a generic partitioning function that allows us to decorate the query plan with constant expressions in the scans(or any nodes along the query plan really) would allow everyone to define their own (and would be immediately useful for me in particular)
Yes.. This is definitely something we want to add. Polars should be able to write partitioned datasets as well.
Stumbled over this when pl.scan_parquet() failed reading a dataset written with PySpark. Having this would be highly appreciated! :)
+1 for this feature
@ritchie46
Polars should be able to write partitioned datasets as well.
Is there a way Polars could support pyarrow.parquet.write_dataset
? It has partitioned writing. Right now only pyarrow.parquet.write_table
is supported (which doesn't have partitioning).
For the people that stumbled here looking for a reading solution, the following works for now:
import os
import polars as pl
def get_recursive_filepaths(directory):
filepaths = []
for root, _, files in os.walk(directory):
for file in files:
filepaths.append(os.path.join(root, file))
return filepaths
directory = "/path/to/your/directory"
recursive_filepaths = get_recursive_filepaths(directory)
total_lazy_df = pl.concat([pl.scan_parquet(path) for path in recursive_filepaths]) # you could add if ".parquet" is in filepath to make sure it's a parquet file
I did a bit of digging into what PyArrow supports in this department. On the topic of reading partitioned files:
The class ParquetDataset
supports reading a directory of partitioned parquet files as follows:
import pyarrow.parquet as pq
dataset = pq.ParquetDataset('dataset_name/')
table = dataset.read()
On the topic of writing partitioned files:
The ParquetWriter (which is currently used by polars) is not capable of writing partitioned files. The functionality to write partitioned files seems to be in the pyarrow.parquet.write_to_dataset()
. Maybe for the polars.DataFrame.write_parquet()
it might be a consideration to add the keyword partition_cols
, and if the keyword contains values to use the write_to_dataset()
functionality?
Lastly, on the topic of sink_parquet()
: This currently uses the RecordBatch functionality in the ParquetWriter that allows for incrementally writing a parquet file. Unfortunately the only way of adding a batch of data to an existing partitioned parquet file seems to be to read the existing partitioned parquet file, concat the new data to the frame, and overwriting the partitioned parquet file with the new data. This means it's impossible to "append" to existing partitioned parquet files with the freshly streamed data. This is unfortunate as parquets are often partitioned because they are too large to fit in memory as a single file. Not being able to sink into them seems to defeat that purpose...
I'm completely new to Rust, so I'm not sure I could create a PR to suggest the new write_to_dataset()
functionality. I hope this bit of research into the PyArrow API can help someone to get it implemented.
PS: In pandas the implementation of writing has the following split:
try:
if partition_cols is not None:
# writes to multiple files under the given path
self.api.parquet.write_to_dataset(
table,
path_or_handle,
compression=compression,
partition_cols=partition_cols,
**kwargs,
)
else:
# write to single output file
self.api.parquet.write_table(
table, path_or_handle, compression=compression, **kwargs
)
-
Scanning a dataset is now possible via:
mport polars as pl mport pyarrow.dataset as ds f = pl.scan_pyarrow_dataset( ds.dataset("my_dataset_folder")
The cool thing is that it works well with remote filesystems like HDFS!
-
Writing a dataset is also possible (but streaming is not, AFAIK) using slices:
rom tqdm import tqdm _rows = df.select(pl.count()).collect().item() hunk_size = 100_000 _chunks = n_rows // chunk_size or idx in tqdm(range(n_chunks)): slice_df = df.slice(offset=idx * chunk_size, length=chunk_size) slice_df.collect().write_parquet(f"my_new_dataset/n{idx}.parquet")
How do you use slice with specific column you want to partition with ? 🤔
How do you use slice with specific column you want to partition with ? :thinking:
You could either loop over the partition values and filter before the slice, then use .sink_parquet()
to stream to parquet:
import os
for value in df.select(pl.col("part_col")).unique().collect()["part_col"]:
part_df = df.filter(pl.col("part_col") == value)
part_dir = f"dataset/part_col={value}"
os.makedirs(part_dir)
part_df.sink_parquet(f"{part_dir}/data.parquet")
Alternatively, if you could use pyarrow to do the partitioning for you. Using the above chunk based code:
from tqdm import tqdm
import pyarrow.parquet as pq
n_rows = df.select(pl.count()).collect().item()
chunk_size = 100_000
n_chunks = n_rows // chunk_size
for idx in tqdm(range(n_chunks)):
slice_df = df.slice(offset=idx * chunk_size, length=chunk_size)
pq.write_to_dataset(slice_df.collect().to_arrow(), "dataset", partition_cols=["col1", "col2", "col3"])
This won't be streaming within chunks, but at least the to_arrow
call is mostly zero-copy. Also, you might end up with weird file sizes depending on how your partition values are distributed among your chunks, but it does make it very convenient when partitioning by many columns.
Thanks for the suggestion, @qiemem! Is the dataset readable/scannable in one shot when using your option 2?
Yup! Actually would be the same with both methods:
df = pl.scan_pyarrow_dataset(ds.dataset("dataset", format="parquet", partitioning="hive"))
pyarrow will automatically pickup on the partitioning schema and the partitioning columns will be available same as any other column to polars. From what I can tell, partition-friendly operations (e.g. anything where your grouping by partition columns) seems pretty fast and memory friendly after that.
Hey,
I still have issues reading directories with partitioning on S3. I get the following error: GetFileInfo() yielded path '[s3bucket uri]/__watermark=2022-01-01/00633d6d4f4648cd82a953694eb4683b.parquet', which is outside base dir '[s3bucket uri]'
Is there something I'm missing?
Local reads work but not for S3. Using s3fs. For dircetory reading I used s3.glob as a workaround. Sadly this won't work if the read should pick up the hive partitioning as column.
closing in favor of https://github.com/pola-rs/polars/issues/10276