polars icon indicating copy to clipboard operation
polars copied to clipboard

Partition aware parquet scanning

Open wseaton opened this issue 3 years ago • 21 comments

In larger parquet datasets it is extremely common for them to be file or object path partitioned, ie:

s3://bucket/my-data/year=2019/month=10/datafile-01.parquet

It doesn't seem like polars is currently partition-aware when reading in files, since you can only read a single file in at once. Letting the user define the partition mapping when scanning the dataset and having them leveraged by predicate and projection pushdown should enable a pretty massive performance improvement. I think doing so would require the API be changed to support reading in multiple files at once, similar to how pyarrow's reader takes a path (accepting globs) or list of paths [1].

Currently though, and correct me if I'm wrong, I think you can just convert from arrow table [2] to get these benefits. Thanks for the awesome library!

  1. https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html#pyarrow-parquet-parquetdataset
  2. https://ritchie46.github.io/polars/pypolars/functions.html#pypolars.functions.from_arrow_table

wseaton avatar Mar 14 '21 15:03 wseaton

This is definitely something I want to support. If this is already supported by PyArrow this should be trivial indeed.

For the lazy scan it may be a bit harder, as that uses Rust natives parquet reader, which is still in development.

ritchie46 avatar Mar 14 '21 16:03 ritchie46

Thanks for the awesome library!

You're welcome, I hope we can make it more awesome. :smile:

ritchie46 avatar Mar 14 '21 16:03 ritchie46

@ritchie46 good to know that for the lazy frame implementation we are blocked a little bit by the arrow rust bindings. I'll hack around some with the eager frames to see what is going on under the hood and if a shim is required

wseaton avatar Mar 14 '21 16:03 wseaton

I was able to read a partitioned dataset without directly calling pyarrow.parquet.read_table by using the following keyword arguments which are passed to pyarrow.parquet.read_table (using polars==0.10.27 and pyarrow==6.0.1):

df = pl.read_parquet('crypto-exchange-pds/ftx', filesystem=fs.S3FileSystem(), 
                                    columns=['market', 'time', 'id', 'price', 'size'], 
                                    filters=[('ds', '>=', '2021-01-01')])

P.S. This is an awesome library, my dataset is too large for Pandas but I didn't want to use PySpark, so this has worked great.

mhconradt avatar Dec 06 '21 23:12 mhconradt

@mhconradt Can you clarify is that pl and not arrow? Just trying to understand if parquet partitions reading is somewhat supported in some version. It doesn't seem to be in what I've pip installed but haven't checked bleeding edge yet.

cottrell avatar Dec 21 '21 23:12 cottrell

@cottrell it is pl. Note it only works if you have pyarrow installed, in which case it calls pyarrow.parquet.read_table with the arguments and creates a pl.DataFrame from the pa.Table. This does support partition-aware scanning, predicate / projection pushdown, etc. Also note I got fs by running from pyarrow import fs. Otherwise read_parquet falls back on the Rust Parquet reader which does not support partition-aware scanning. Yet.

mhconradt avatar Dec 23 '21 03:12 mhconradt

pl.read_parquet doesn't seem to work with just a dir name even if pyarrow is installed for me.

In [5]: pl.read_parquet('../suzieq/play/donna/new-parquet/routes/', columns=['namespace', 'hostname', 'vrf', 'prefix'])
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-5-5fec8a614bd1> in <module>
----> 1 pl.read_parquet('../suzieq/play/donna/new-parquet/routes/', columns=['namespace', 'hostname', 'vrf', 'prefix'])

~/work/dsengines/.venv/lib/python3.7/site-packages/polars/io.py in read_parquet(source, columns, n_rows, use_pyarrow, memory_map, storage_options, parallel, **kwargs)
    782 
    783         return DataFrame._read_parquet(
--> 784             source_prep, columns=columns, n_rows=n_rows, parallel=parallel
    785         )
    786 

~/work/dsengines/.venv/lib/python3.7/site-packages/polars/internals/frame.py in _read_parquet(file, columns, n_rows, parallel)
    590         projection, columns = handle_projection_columns(columns)
    591         self = DataFrame.__new__(DataFrame)
--> 592         self._df = PyDataFrame.read_parquet(file, columns, projection, n_rows, parallel)
    593         return self
    594 

RuntimeError: Any(ArrowError(ExternalFormat("underlying IO error: Is a directory (os error 21)")))

This is polars version 0.12.14

ddutt avatar Jan 22 '22 15:01 ddutt

Polars allows globbing patterns.

pl.read_parquet('/routes/**/*.parquet', columns=['namespace', 'hostname', 'vrf', 'prefix'])

ritchie46 avatar Jan 22 '22 17:01 ritchie46

Thanks @ritchie46 though that works, I can't stitch together key information that is present in the directory partition

ddutt avatar Jan 22 '22 17:01 ddutt

Thanks @ritchie46 though that works, I can't stitch together key information that is present in the directory partition

Sorry, what do you mean?

ritchie46 avatar Jan 22 '22 17:01 ritchie46

Say, I have a parquet folder structured in hive format as follows: /parquet/namespace=foo/. Under this namespace folder, there are lots of folders with the name format hostname=(maybe a thousand) and the are parquet files are under each hostname folder. When I do read with globs, I don't get namespace and hostname columns with the dataframe. Am I making myself clear?

ddutt avatar Jan 22 '22 17:01 ddutt

One clunky way to make this work is to manually walk namespace and hostname folder and for each hostname folder manually add the namespace and hostname columns.

ddutt avatar Jan 22 '22 17:01 ddutt

And a different error I get when I don't specify any columns (I want them all) is I get this error:

Any(ArrowError(NotYetImplemented("Decoding \"RleDictionary\"-encoded, dictionary-encoded optional V1 pages is not yet implemented for primitive")))

ddutt avatar Jan 22 '22 17:01 ddutt

Partition reading should be possible through pl.scan_ds. Using the example here, you can pass

dset = ds.dataset("s3://my-partitioned-folder/", format="parquet", partitioning="hive")

as described here

matteosantama avatar Aug 02 '22 02:08 matteosantama

It's possible, but apparently not partition-aware yet. @joscani was doing some experiments and, even though the read operation succeeded, it did not leverage the partitions and started using all the available RAM. The experiment was with the NYC Taxi dataset partitioned by year and month.

astrojuanlu avatar Dec 12 '22 21:12 astrojuanlu

In latest release predicate pushdown is sent to pyarrow, so scan_ds should be partition aware if pyarrow is.

ritchie46 avatar Dec 13 '22 06:12 ritchie46

Hi. I'm newbie in python and polars.

My code looks like

import polars as pl
import pyarrow.dataset as ds

nyc_dir =  "/media/hd1/nyc-taxi"

myds = ds.dataset(nyc_dir, partitioning=["year", "month"] )


df = pl.scan_ds(myds)

(
df
.lazy()
.filter(pl.col("total_amount") > 100)
.select(["tip_amount", "total_amount", "year"])
    .groupby(['year'])
    .agg(
        [
            pl.col("tip_amount").sum().alias('v1_sum')
        ]
    )
.collect()
)

And the proccess start, and using all available RAM, and crash without succeed

I tried to do the same using arrow in R and it works , using litle ram.

suppressPackageStartupMessages({
    library(tidyverse) 
    library(arrow)
    library(tictoc)
    }
)
nyc_dir <-  "/media/hd1/nyc-taxi"

tic("open dataset arrow")
ds <- open_dataset(nyc_dir, partitioning = c("year", "month"))
    
tic("filter y group by arrow")
ds %>%
    filter(total_amount > 100) %>%
    select(tip_amount, total_amount, year) %>%
    group_by(year) %>%
    summarise(
        sum_amount = sum(total_amount, na.rm=TRUE)
    ) %>%
    collect() %>%
    print()

toc()
toc()
# A tibble: 14 × 2
    year sum_amount
   <int>      <dbl>
 1  2009   5870919.
 2  2010   8235264.
 3  2011   9885387.
 4  2012  14042164.
 5  2013  22018804.
 6  2014  24048179.
 7  2015  38511729.
 8  2016  38844611.
 9  2017  35896429.
10  2018  31102855.
11  2019  30377572.
12  2020   9539477.
13  2021  10887082.
14  2022   1856850.
> 
> toc()
filter y group by arrow: 9.245 sec elapsed
> toc()
open dataset arrow: 9.408 sec elapsed

Any idea?

joscani avatar Dec 13 '22 09:12 joscani

Which polats version are you running?

ritchie46 avatar Dec 13 '22 09:12 ritchie46

Hi @ritchie46 .

pip show polars

Name: polars
Version: 0.14.9
Summary: Blazingly fast DataFrame library
Home-page: https://github.com/pola-rs/polars
Author: ritchie46 <[email protected]>

joscani avatar Dec 13 '22 09:12 joscani

I just installed version 0.15.3 and all runs ok. Thanksss.

joscani avatar Dec 13 '22 09:12 joscani

(Is #4347 a duplicate of this?)

I have the following code (exact S3 URI omitted):

I'm also facing issues with getting this to work, though it is a bit hard to tell exactly what the issue is.

import pyarrow.dataset as ds
import polars as pl

dset = ds.dataset("s3://my-dataset/at-prefix/", format="parquet", partitioning=["track_date"], exclude_invalid_files=False)
pl.scan_ds(dset).filter(
    pl.col("track_date") == "2022-07-01"
).collect()

This dataset has about 1200 partitions (one for each track date) and a few hundred thousand records per track_date. I would expect that if the predicate pushdown was working correctly and everything was going smoothly, pyarrow would know to look straight at the partition for track_date=2022-07-01, and loading the data should be a pretty quick affair.

When I try to run the above code, though, it hangs. The execution plan reveals (as far as I can tell reading the diagram) that the predicate is not in fact pushed down:

output svg

Any thoughts on why this might be?

maxdumas avatar Dec 17 '22 05:12 maxdumas

I am looking forward to a polars native solutin.

Current workaround for me is the following:

def read_parquet_dataset(path, partitioning=None, filter_=None, with_columns=None, storage_options=None):
    if storage_options is not None:
        fs=s3fs.S3FileSystem(**storage_options)
        files=["s3://+f for f in fs.glob(path)]
    else:
        files=glob.glib(path)

    scans=list()
    for f in files:
        if partitioning is not None:
             partitions=[pl.lit(v).alias(n) for v,n in zip(f.split("/")[:-1:-1], partitioning)]
        scan=pl.scan_parquet(f, storage_options=storate_options)
        if filter_ is not None:
            scan=scan.filter(filter_)
        if with_columns is not None:
            scan=scan.with_columns(with_columns)
        scans.append(scan)

    return pl.concat(pl.collect_all(scans))

legout avatar Mar 10 '23 13:03 legout

closing in favor of https://github.com/pola-rs/polars/issues/10276

universalmind303 avatar Sep 08 '23 17:09 universalmind303