polars icon indicating copy to clipboard operation
polars copied to clipboard

Pyarrow filter not pushed to scan_ds if datatype is a string

Open dominikpeter opened this issue 2 years ago • 1 comments
trafficstars

Polars version checks

  • [X] I have checked that this issue has not already been reported.

  • [X] I have confirmed this bug exists on the latest version of Polars.

Issue description

I have quite some performance differences when filtering a pyarrow dataset with the scanner method compared to polars own filtering. After some testing, I found a strange behavior that could explain the performance difference. Could it be that predicates will not be pushed down to the pyarrow dataset if the datatype is a string?

In the example below the predicate is None in the first example compared to the other examples that filter on an integer:

predicate = None with_columns = ['animal']

predicate = (pa.dataset.field('n_legs') == 2) with_columns = ['animal']

predicate = (pa.dataset.field('year') == 2020) with_columns = ['animal']

Therefore, on a bigger dataset, ds.scanner(filter=ds.field("animal") == "Flamingo") is way faster.

Or do I miss something?

Reproducible example

import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

from typing import cast
from functools import partial
import pickle

table = pa.table(
    {
        "year": [2020, 2022, 2021, 2022, 2019, 2021],
        "n_legs": [2, 2, 4, 4, 5, 100],
        "animal": ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"],
    }
)


pq.write_table(table, "file.parquet")

dataset = ds.dataset("file.parquet", format="parquet")


def _scan_ds_impl(ds: pa.dataset.dataset, with_columns: list[str] | None, predicate: str | None) -> pl.DataFrame:

    print(f"predicate = {predicate}")
    print(f"with_columns = {with_columns}")

    _filter = None
    if predicate:
        _filter = eval(predicate)
    return cast(pl.DataFrame, pl.from_arrow(ds.to_table(columns=with_columns, filter=_filter)))


def _scan_ds(ds: pa.dataset.dataset, allow_pyarrow_filter: bool = True) -> pl.LazyFrame:

    func = partial(_scan_ds_impl, ds)
    func_serialized = pickle.dumps(func)
    return pl.LazyFrame._scan_python_function(ds.schema, func_serialized, allow_pyarrow_filter)


df = (
    _scan_ds(dataset, allow_pyarrow_filter=True)
    .select(pl.col("animal"))
    .filter(pl.col("animal") == "Flamingo")
    .collect()
)
print("----------------")
df = _scan_ds(dataset, allow_pyarrow_filter=True).select(pl.col("animal")).filter(pl.col("n_legs") == 2).collect()


print("----------------")
df = _scan_ds(dataset, allow_pyarrow_filter=True).select(pl.col("animal")).filter(pl.col("year") == 2020).collect()

Expected behavior

Also push down the string predicate:

predicate = (pa.dataset.field('animal') == 'Flamingo') with_columns = ['animal']

predicate = (pa.dataset.field('n_legs') == 2) with_columns = ['animal']

predicate = (pa.dataset.field('year') == 2020) with_columns = ['animal']

Installed versions

---Version info--- Polars: 0.15.16 Index type: UInt32 Platform: macOS-13.1-arm64-arm-64bit Python: 3.11.1 (v3.11.1:a7a450f84a, Dec 6 2022, 15:24:06) [Clang 13.0.0 (clang-1300.0.29.30)] ---Optional dependencies--- pyarrow: 10.0.1 pandas: 1.5.2 numpy: 1.24.1 fsspec: connectorx: xlsx2csv: deltalake: matplotlib:

dominikpeter avatar Jan 23 '23 20:01 dominikpeter

@stinodego that would explain why partitioning must be set in scan_delta.

ritchie46 avatar Jan 23 '23 20:01 ritchie46

Could we help here somehow? I think the issue is not in Python, but in the Rust backend. But we don't really know where to start debugging. We cannot not really use Delta Lake with Polars at the moment because we have to put way too much data into memory with our data. We can filter the data first and then use from_arrow to load the data. With a z-ordered table, it works well even on large datasets. So, we can work around it. However, we would like to use Polars native filter method -because we love Polars :-).

dominikpeter avatar Feb 03 '23 15:02 dominikpeter

I have to look at this more closely - I admit it fell off my radar. Will come back to this later this weekend.

stinodego avatar Feb 03 '23 15:02 stinodego

@stinodego that would explain why partitioning must be set in scan_delta.

That is a different issue. scan_delta does the following:

  1. Map the file to a DeltaTable object (deltalake dependency takes care of this)
  2. Convert to a pyarrow dataset (deltalake dependency takes care of this)
  3. Call pl.scan_ds

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

The way to handle this right now is to specify filters manually in step 2. I wouldn't know a good way to automate this.

On topic

About the original issue from this post: I can confirm that string predicates are not pushed up correctly. Thanks for the clear example. I'm really not very familiar with this scanning code - @ritchie46 could you take a closer look?

stinodego avatar Feb 05 '23 19:02 stinodego

@stinodego that would explain why partitioning must be set in scan_delta.

That is a different issue. scan_delta does the following:

  1. Map the file to a DeltaTable object (deltalake dependency takes care of this)
  2. Convert to a pyarrow dataset (deltalake dependency takes care of this)
  3. Call pl.scan_ds

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

The way to handle this right now is to specify filters manually in step 2. I wouldn't know a good way to automate this.

On topic

About the original issue from this post: I can confirm that string predicates are not pushed up correctly. Thanks for the clear example. I'm really not very familiar with this scanning code - @ritchie46 could you take a closer look?

I need to look at this more closely, but I think if the filters are properly pushed into the scan_ds. The partition is also filtered and file skipping is activated. At least this is described here: reading partitioned data

Regarding the scan_ds issue, I can also confirm that none of the predicated are pushed through when a string is used. In the example both filters are not included: df = ( _scan_ds(dataset, allow_pyarrow_filter=True) .select([pl.col("animal"), pl.col("year")]) .filter((pl.col("year") == 2020) & (pl.col("animal") == "Flamingo")) .collect() )

predicate = None with_columns = ['animal', 'year']

dominikpeter avatar Feb 05 '23 21:02 dominikpeter

Something to consider: https://github.com/delta-io/delta-rs/issues/1128 ADBC Support would most likely supersed current implementation?

aersam avatar Feb 06 '23 14:02 aersam

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

@stinodego your above explanation is correct, but wont this affect any thing that relies on scan_ds and not just scan_delta?

Let me know if you need some help regarding this, because I saw this as a potential bottleneck while adding the functionality. I considered using the deltalake lib only to get the paths of the parquet files based on table version and then call scan_parquet internally in scan_delta but that did not seem like the right way to go either.

chitralverma avatar Feb 07 '23 15:02 chitralverma

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

@stinodego your above explanation is correct, but wont this affect any thing that relies on scan_ds and not just scan_delta?

Let me know if you need some help regarding this, because I saw this as a potential bottleneck while adding the functionality. I considered using the deltalake lib only to get the paths of the parquet files based on table version and then call scan_parquet internally in scan_delta but that did not seem like the right way to go either.

I think there are two problems with the scan_parquet approach. First, scan_parquet only allows glob pattern, so you would need to merge them somehow. Second, you would lose the benefits of the statistics. They are stored in the pyarrow dataset fragments. Therefore, file skipping would not be possible.

I believe if scan_dspasses all predicates to the underlying pyarrow datasets, the problem with the partitions are solved too.

@chitralverma by the way, I saw that delta-rs 0.7.0 was released, that would allow the DeltaStorageHandler to be pickable delta-rs issue 1016 -wouldn't solve this issue thought.

dominikpeter avatar Feb 07 '23 16:02 dominikpeter

First, scan_parquet only allows glob pattern, so you would need to merge them somehow.

for lazy dfs, concat is a zero copy op, so that is easy.

Second, you would lose the benefits of the statistics

actually i confirmed it with some of the delta guys for another usecase on how this works, If i use the deltalake lib to just list the parquet files for a specified version, the delta log is looked up and the file skipping actually happens. the main question is this - does the polars parquet read use parquet statistics for optimizing IO ? @ritchie46 maybe you can clarify this one.

chitralverma avatar Feb 07 '23 20:02 chitralverma

I did some tests on my side on the dataset we are struggling with:

file_url = "data/delta/item"

@memory_usage
def scan():
    df = pl.scan_delta(file_url)
    print(df.filter(pl.col("item") == "00009501").fetch(5))

@memory_usage
def concat():
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()])
    print(dfs.filter(pl.col("item") == "00009501").fetch(5))

Function Name : scan Current memory usage: 0.178867MB Peak : 3.096506MB Timing Took : 11.0809 seconds

Function Name : concat Current memory usage: 0.001219MB Peak : 0.021559MB Timing Took : 0.0959 seconds

The parquet concat method is significantly faster in my example.

@chitralverma thanks a lot! We can already use this internally for our use case.

dominikpeter avatar Feb 07 '23 20:02 dominikpeter

@dominikpeter try setting rechunk to False in pl.concat and see if it gets better

chitralverma avatar Feb 07 '23 21:02 chitralverma

times = []

for i in range(10):
    start = datetime.now()
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()], rechunk=False)
    dfs.filter((pl.col("item") == "01523048")).collect(streaming=True)
    end = datetime.now()
    delta = (end - start).total_seconds()
    times.append(delta)

print("rechunk=False")
print(np.array(times).mean())
print(np.array(times).std())

times = []

for i in range(10):
    start = datetime.now()
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()], rechunk=True)
    dfs.filter((pl.col("item") == "01523048")).collect(streaming=True)
    end = datetime.now()
    delta = (end - start).total_seconds()
    times.append(delta)

print("rechunk=True")
print(np.array(times).mean())
print(np.array(times).std())

rechunk=False 2.7260541 0.08526206672776585

rechunk=True 2.8425098 0.21936449264126584

@chitralverma it slightly improved the performance in my example

dominikpeter avatar Feb 07 '23 21:02 dominikpeter

ok, i expected that. so this can technically work, here are my suggestions for further development of this connector in Polars.

  • We change change the implementation internally to not rely on pyarrow and scan_ds and instead rely on scan_parquet like you just tried, OR,
  • We move all this to rust side and then do the same. - Gain of this is that the for loop you just wrote is sequential and can be sped up if run in parallel with async or something.

What I am looking for is an argument against this approach. I don't think we are missing out on any capabilities but I am also not 100% sure.

BTW sorry for using your time to test these things but if you have a delta table in an object storage (any one) can you try the scan_delta vs scan_parquet test on that. my guess is that in that case the results are not going to be very different.

chitralverma avatar Feb 07 '23 21:02 chitralverma

I don't think there is a real downside. This approach is also described here: https://delta-io.github.io/delta-rs/python/usage.html for dask dataframes.

I think this could live inside rust and benefit from the things you mentioned. I would prefer this approach.

No problem. I have to thank here :-). I will test it tomorrow and give feedback.

dominikpeter avatar Feb 07 '23 22:02 dominikpeter

I was not able to make it work with an object store (azure storage account).

file_url = f"abfss://[email protected]/delta/item"
file_url = f"abfss://mycontainer/delta/item"

# tried both

storage_options = {
    "account_name": "mystorage",
    "account_key": "mykey",
}

@memory_usage
def concat():
    dt = DeltaTable(file_url, storage_options=storage_options)
    print(dt.file_uris())
    dfs = pl.concat([pl.scan_parquet(i, storage_options=storage_options) for i in dt.file_uris()])
    print(dfs.filter(pl.col("item") == "8043152").collect(streaming=True))

delta-rs gives me the correct parquet files with the correct uris back.

But polars results in an error: FileNotFoundError: No such file or directory: abfss://xxxxxxx

We discussed it a little bit internally. For the moment, we can push down the filters to the pyarrow dataset or use the concat approach. This will work for our use case.

I guess, it makes sense to wait for ADBC before putting too much effort in this. One downside of the concat parquet approach will be for sure for future releases of delta-rs, when they want to support column mapping, computed columns and so on.

Still believe it would make sense to fix the scan_ds filter pushdown bug, thought :-).

dominikpeter avatar Feb 08 '23 09:02 dominikpeter

the file not found issue is because abfss scheme is not supported by fsspec

see supported schemes in the azure example here

https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.scan_delta.html#polars.scan_delta

but this can be changed now that the pickling issue is fixed on their side, however then in polars we will have to pin deltalake dependency to 0.7+

ADBC will require a conversion of polars plan to SQL for pushdown which does not exist at the moment in polars.

chitralverma avatar Feb 08 '23 10:02 chitralverma

I did some extensive test with the application we are running. I compared the scan_delta vs concat_parquet approach.

We are using for example offset this is way slower with the concat_parquet approach. However, filtering on string column of course is faster. But still believe there it could be faster when skipping parquet files that don't include record I am filtering for.

It was a really mixed result. I think for the moment I would keep it as it is. Maybe bump to 0.7 to take advantage of the new pickling.

What we end up doing probably is something like this:

    if filter:
        dt = DeltaTable(file_url)
        dataset = dt.to_pyarrow_dataset().scanner(filter=ds.field("item") == "8043152").to_table()
        df = pl.from_arrow(dataset)
    else:
        df = pl.scan_delta(file_url)

dominikpeter avatar Feb 08 '23 15:02 dominikpeter

The problem seems to be that we don't push the predicate down to _scan_ds. I will take a look.

ritchie46 avatar Feb 08 '23 15:02 ritchie46