polars
polars copied to clipboard
Partition aware parquet scanning
In larger parquet datasets it is extremely common for them to be file or object path partitioned, ie:
s3://bucket/my-data/year=2019/month=10/datafile-01.parquet
It doesn't seem like polars
is currently partition-aware when reading in files, since you can only read a single file in at once. Letting the user define the partition mapping when scanning the dataset and having them leveraged by predicate and projection pushdown should enable a pretty massive performance improvement. I think doing so would require the API be changed to support reading in multiple files at once, similar to how pyarrow
's reader takes a path (accepting globs) or list of paths [1].
Currently though, and correct me if I'm wrong, I think you can just convert from arrow table [2] to get these benefits. Thanks for the awesome library!
- https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html#pyarrow-parquet-parquetdataset
- https://ritchie46.github.io/polars/pypolars/functions.html#pypolars.functions.from_arrow_table
This is definitely something I want to support. If this is already supported by PyArrow this should be trivial indeed.
For the lazy scan it may be a bit harder, as that uses Rust natives parquet reader, which is still in development.
Thanks for the awesome library!
You're welcome, I hope we can make it more awesome. :smile:
@ritchie46 good to know that for the lazy frame implementation we are blocked a little bit by the arrow rust bindings. I'll hack around some with the eager frames to see what is going on under the hood and if a shim is required
I was able to read a partitioned dataset without directly calling pyarrow.parquet.read_table
by using the following keyword arguments which are passed to pyarrow.parquet.read_table
(using polars==0.10.27 and pyarrow==6.0.1):
df = pl.read_parquet('crypto-exchange-pds/ftx', filesystem=fs.S3FileSystem(),
columns=['market', 'time', 'id', 'price', 'size'],
filters=[('ds', '>=', '2021-01-01')])
P.S. This is an awesome library, my dataset is too large for Pandas but I didn't want to use PySpark, so this has worked great.
@mhconradt Can you clarify is that pl and not arrow? Just trying to understand if parquet partitions reading is somewhat supported in some version. It doesn't seem to be in what I've pip installed but haven't checked bleeding edge yet.
@cottrell it is pl
. Note it only works if you have pyarrow
installed, in which case it calls pyarrow.parquet.read_table
with the arguments and creates a pl.DataFrame
from the pa.Table
. This does support partition-aware scanning, predicate / projection pushdown, etc. Also note I got fs
by running from pyarrow import fs
.
Otherwise read_parquet
falls back on the Rust Parquet reader which does not support partition-aware scanning. Yet.
pl.read_parquet doesn't seem to work with just a dir name even if pyarrow is installed for me.
In [5]: pl.read_parquet('../suzieq/play/donna/new-parquet/routes/', columns=['namespace', 'hostname', 'vrf', 'prefix'])
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-5-5fec8a614bd1> in <module>
----> 1 pl.read_parquet('../suzieq/play/donna/new-parquet/routes/', columns=['namespace', 'hostname', 'vrf', 'prefix'])
~/work/dsengines/.venv/lib/python3.7/site-packages/polars/io.py in read_parquet(source, columns, n_rows, use_pyarrow, memory_map, storage_options, parallel, **kwargs)
782
783 return DataFrame._read_parquet(
--> 784 source_prep, columns=columns, n_rows=n_rows, parallel=parallel
785 )
786
~/work/dsengines/.venv/lib/python3.7/site-packages/polars/internals/frame.py in _read_parquet(file, columns, n_rows, parallel)
590 projection, columns = handle_projection_columns(columns)
591 self = DataFrame.__new__(DataFrame)
--> 592 self._df = PyDataFrame.read_parquet(file, columns, projection, n_rows, parallel)
593 return self
594
RuntimeError: Any(ArrowError(ExternalFormat("underlying IO error: Is a directory (os error 21)")))
This is polars version 0.12.14
Polars allows globbing patterns.
pl.read_parquet('/routes/**/*.parquet', columns=['namespace', 'hostname', 'vrf', 'prefix'])
Thanks @ritchie46 though that works, I can't stitch together key information that is present in the directory partition
Thanks @ritchie46 though that works, I can't stitch together key information that is present in the directory partition
Sorry, what do you mean?
Say, I have a parquet folder structured in hive format as follows: /parquet/namespace=foo/. Under this namespace folder, there are lots of folders with the name format hostname=
One clunky way to make this work is to manually walk namespace and hostname folder and for each hostname folder manually add the namespace and hostname columns.
And a different error I get when I don't specify any columns (I want them all) is I get this error:
Any(ArrowError(NotYetImplemented("Decoding \"RleDictionary\"-encoded, dictionary-encoded optional V1 pages is not yet implemented for primitive")))
Partition reading should be possible through pl.scan_ds
. Using the example here, you can pass
dset = ds.dataset("s3://my-partitioned-folder/", format="parquet", partitioning="hive")
as described here
It's possible, but apparently not partition-aware yet. @joscani was doing some experiments and, even though the read operation succeeded, it did not leverage the partitions and started using all the available RAM. The experiment was with the NYC Taxi dataset partitioned by year and month.
In latest release predicate pushdown is sent to pyarrow, so scan_ds
should be partition aware if pyarrow is.
Hi. I'm newbie in python and polars.
My code looks like
import polars as pl
import pyarrow.dataset as ds
nyc_dir = "/media/hd1/nyc-taxi"
myds = ds.dataset(nyc_dir, partitioning=["year", "month"] )
df = pl.scan_ds(myds)
(
df
.lazy()
.filter(pl.col("total_amount") > 100)
.select(["tip_amount", "total_amount", "year"])
.groupby(['year'])
.agg(
[
pl.col("tip_amount").sum().alias('v1_sum')
]
)
.collect()
)
And the proccess start, and using all available RAM, and crash without succeed
I tried to do the same using arrow in R and it works , using litle ram.
suppressPackageStartupMessages({
library(tidyverse)
library(arrow)
library(tictoc)
}
)
nyc_dir <- "/media/hd1/nyc-taxi"
tic("open dataset arrow")
ds <- open_dataset(nyc_dir, partitioning = c("year", "month"))
tic("filter y group by arrow")
ds %>%
filter(total_amount > 100) %>%
select(tip_amount, total_amount, year) %>%
group_by(year) %>%
summarise(
sum_amount = sum(total_amount, na.rm=TRUE)
) %>%
collect() %>%
print()
toc()
toc()
# A tibble: 14 × 2
year sum_amount
<int> <dbl>
1 2009 5870919.
2 2010 8235264.
3 2011 9885387.
4 2012 14042164.
5 2013 22018804.
6 2014 24048179.
7 2015 38511729.
8 2016 38844611.
9 2017 35896429.
10 2018 31102855.
11 2019 30377572.
12 2020 9539477.
13 2021 10887082.
14 2022 1856850.
>
> toc()
filter y group by arrow: 9.245 sec elapsed
> toc()
open dataset arrow: 9.408 sec elapsed
Any idea?
Which polats version are you running?
Hi @ritchie46 .
pip show polars
Name: polars
Version: 0.14.9
Summary: Blazingly fast DataFrame library
Home-page: https://github.com/pola-rs/polars
Author: ritchie46 <[email protected]>
I just installed version 0.15.3 and all runs ok. Thanksss.
(Is #4347 a duplicate of this?)
I have the following code (exact S3 URI omitted):
I'm also facing issues with getting this to work, though it is a bit hard to tell exactly what the issue is.
import pyarrow.dataset as ds
import polars as pl
dset = ds.dataset("s3://my-dataset/at-prefix/", format="parquet", partitioning=["track_date"], exclude_invalid_files=False)
pl.scan_ds(dset).filter(
pl.col("track_date") == "2022-07-01"
).collect()
This dataset has about 1200 partitions (one for each track date) and a few hundred thousand records per track_date
. I would expect that if the predicate pushdown was working correctly and everything was going smoothly, pyarrow
would know to look straight at the partition for track_date=2022-07-01
, and loading the data should be a pretty quick affair.
When I try to run the above code, though, it hangs. The execution plan reveals (as far as I can tell reading the diagram) that the predicate is not in fact pushed down:
Any thoughts on why this might be?
I am looking forward to a polars native solutin.
Current workaround for me is the following:
def read_parquet_dataset(path, partitioning=None, filter_=None, with_columns=None, storage_options=None):
if storage_options is not None:
fs=s3fs.S3FileSystem(**storage_options)
files=["s3://+f for f in fs.glob(path)]
else:
files=glob.glib(path)
scans=list()
for f in files:
if partitioning is not None:
partitions=[pl.lit(v).alias(n) for v,n in zip(f.split("/")[:-1:-1], partitioning)]
scan=pl.scan_parquet(f, storage_options=storate_options)
if filter_ is not None:
scan=scan.filter(filter_)
if with_columns is not None:
scan=scan.with_columns(with_columns)
scans.append(scan)
return pl.concat(pl.collect_all(scans))
closing in favor of https://github.com/pola-rs/polars/issues/10276