polars icon indicating copy to clipboard operation
polars copied to clipboard

scan_parquet is slow to scan over parquet dataset with many rows

Open jtanx opened this issue 1 year ago • 8 comments

Checks

  • [X] I have checked that this issue has not already been reported.
  • [X] I have confirmed this bug exists on the latest version of Polars.

Reproducible example

  1. Generate test dataset
import datetime as dt
from pathlib import Path

import polars as pl

output = Path("data")
output.mkdir(exist_ok=True)


for d in pl.date_range(
    dt.date(2023, 1, 1), dt.date(2024, 1, 1), eager=True, interval="1d"
):
    if d.isoweekday() in (6, 7):
        continue
    st = dt.datetime.combine(d, dt.time(8, 0))
    et = dt.datetime.combine(d, dt.time(7, 0)) + dt.timedelta(days=1)
    lf = pl.LazyFrame().select(
        pl.datetime_range(st, et, dt.timedelta(seconds=1)).alias("ts")
    )
    lf2 = pl.LazyFrame().select(pl.int_range(1, 1000).alias("id"))
    df = lf.join(lf2, how="cross")
    df.collect().write_parquet(
        output / f"{d}.parquet", row_group_size=100000, use_pyarrow=True
    )
    print(d)
  1. Scan over that dataset
import datetime as dt

import polars as pl

df = pl.scan_parquet("data/*.parquet", glob=True)
res = (
    df.filter(pl.col("ts") > dt.datetime(2023, 2, 10), pl.col("id") == 10)
    .head(10)
    .collect()
)
print(res)
  1. Alternative scan using duckdb
import duckdb

con = duckdb.connect()
res = con.query("""
select * from
'data/*.parquet'
where id=10
and ts>'2023-02-10'
limit 10
""")

print(res)
  1. Alternatively using pyarrow
import datetime as dt

import pyarrow.compute as pc
import pyarrow.dataset as pds

ds = pds.dataset("data/", format="parquet")
ds = ds.filter((pc.field("ts") > dt.datetime(2023, 2, 10)) & (pc.field("id") == 10))
print(ds.head(10))

Log output

Polars:


time python parquet_test_read.py 
shape: (10, 2)
┌─────────────────────┬─────┐
│ ts                  ┆ id  │
│ ---                 ┆ --- │
│ datetime[μs]        ┆ i64 │
╞═════════════════════╪═════╡
│ 2023-02-10 00:00:01 ┆ 10  │
│ 2023-02-10 00:00:02 ┆ 10  │
│ 2023-02-10 00:00:03 ┆ 10  │
│ 2023-02-10 00:00:04 ┆ 10  │
│ 2023-02-10 00:00:05 ┆ 10  │
│ 2023-02-10 00:00:06 ┆ 10  │
│ 2023-02-10 00:00:07 ┆ 10  │
│ 2023-02-10 00:00:08 ┆ 10  │
│ 2023-02-10 00:00:09 ┆ 10  │
│ 2023-02-10 00:00:10 ┆ 10  │
└─────────────────────┴─────┘

real    3m51.228s
user    3m44.271s
sys     3m7.796s

Duckdb:

time python parquet_test_read2.py 
┌─────────────────────┬───────┐
│         ts          │  id   │
│      timestamp      │ int64 │
├─────────────────────┼───────┤
│ 2023-02-10 00:00:01 │    10 │
│ 2023-02-10 00:00:02 │    10 │
│ 2023-02-10 00:00:03 │    10 │
│ 2023-02-10 00:00:04 │    10 │
│ 2023-02-10 00:00:05 │    10 │
│ 2023-02-10 00:00:06 │    10 │
│ 2023-02-10 00:00:07 │    10 │
│ 2023-02-10 00:00:08 │    10 │
│ 2023-02-10 00:00:09 │    10 │
│ 2023-02-10 00:00:10 │    10 │
├─────────────────────┴───────┤
│ 10 rows           2 columns │
└─────────────────────────────┘


real    0m0.609s
user    0m0.543s
sys     0m0.042s

Pyarrow:

 time python parquet_test_read3.py 
pyarrow.Table
ts: timestamp[us]
id: int64
----
ts: [[],[],...,[],[2023-02-10 00:00:01.000000,2023-02-10 00:00:02.000000,2023-02-10 00:00:03.000000,2023-02-10 00:00:04.000000,2023-02-10 00:00:05.000000,2023-02-10 00:00:06.000000,2023-02-10 00:00:07.000000,2023-02-10 00:00:08.000000,2023-02-10 00:00:09.000000,2023-02-10 00:00:10.000000]]
id: [[],[],...,[],[10,10,10,10,10,10,10,10,10,10]]

real    0m3.054s
user    0m3.854s
sys     0m0.613s

Issue description

It feels to me like scan_parquet is not considering the Parquet statistics at all; it should be able to take the provided filter and (reasonably) quickly narrow down to finding a relevant chunk. Other operations like groupby/count is also very slow.

Expected behavior

Should be able to quickly scan/read dataset based on the provided filter

Installed versions

--------Version info---------
Polars:              1.9.0
Index type:          UInt32
Platform:            Linux-6.8.0-45-generic-x86_64-with-glibc2.39
Python:              3.12.3 (main, Sep 11 2024, 14:17:37) [GCC 13.2.0]

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               <not installed>
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            <not installed>
fastexcel            <not installed>
fsspec               <not installed>
gevent               <not installed>
great_tables         <not installed>
matplotlib           3.9.2
nest_asyncio         1.6.0
numpy                2.1.1
openpyxl             <not installed>
pandas               2.2.3
pyarrow              17.0.0
pydantic             <not installed>
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>

jtanx avatar Oct 12 '24 06:10 jtanx

Seems strange, I will have a look somewhere soon.

coastalwhite avatar Oct 12 '24 07:10 coastalwhite

I think this is a problem of us not doing both SLICE and PREDICATE PUSHDOWN.

coastalwhite avatar Oct 12 '24 18:10 coastalwhite

fwiw if I use scan_pyarrow_dataset instead, it just consumes a bucketload of ram and eventually ooms

import datetime as dt

import polars as pl
import pyarrow.dataset as pds

ds = pds.dataset("data/", format="parquet")
df = pl.scan_pyarrow_dataset(ds)
res = (
    df.filter(pl.col("ts") > dt.datetime(2023, 2, 10), pl.col("id") == 10)
    .head(10)
    .collect()
)
print(res)

jtanx avatar Oct 12 '24 22:10 jtanx

I think this is a problem of us not doing both SLICE and PREDICATE PUSHDOWN.

Yes, we must accept a prefilter slice and a postfilter slice in the readers. Currently it's ambigious and we don't push down both. This will automatically be resolved in streaming, but we should also have this distinction for in-mem engine.

ritchie46 avatar Oct 13 '24 11:10 ritchie46

I ran into this as well, at least I suspect it's the same issue. I have an 8GB file with about 325 million rows.

lf.select(pl.count()).collect()

is very fast.

lf.select(pl.col("timestamp").min()).collect()

is pretty fast too..

lf.filter(pl.col("timestamp") == pl.col("timestamp").max()).collect()

actually OOMs on my 32GB machine

Here's the explain if that helps?

lf.filter(pl.col("timestamp") == pl.col("timestamp").max()).explain()
'FILTER [(col("timestamp")) == (col("timestamp").max())] FROM\n  Parquet SCAN [redacted.pl]\n  PROJECT */7 COLUMNS'

RmStorm avatar Oct 15 '24 10:10 RmStorm

@RmStorm I don't think that is related. We simply don't filter that while reading because we cannot know the maximum while reading (although admittedly we could do some smarter stuff there). I am pretty sure your computer OOMs because DataFrame before it gets filtered is larger than 32GB.

coastalwhite avatar Oct 15 '24 10:10 coastalwhite

yeah but the point is that filtering is possible because the timestamp column has min/max ranges written on all the row_groups and there is just a single row_group that contains that timestamp so only it should be loaded. My understanding of the predicate pushdown stuff is that it's kinda supposed to leverage the parquet statistics to only load the relevant row groups? Or am I misunderstanding it?

RmStorm avatar Oct 15 '24 10:10 RmStorm

Ah, I see what you mean. Theoretically, it is possible to do what you are proposing. Not sure about non-integer columns, since the min and max there might be different from how we define them. I can maybe look into it, later.

Still not really related to this issue, I will open a separate issue.

coastalwhite avatar Oct 15 '24 11:10 coastalwhite