polars
polars copied to clipboard
scan_parquet is slow to scan over parquet dataset with many rows
Checks
- [X] I have checked that this issue has not already been reported.
- [X] I have confirmed this bug exists on the latest version of Polars.
Reproducible example
- Generate test dataset
import datetime as dt
from pathlib import Path
import polars as pl
output = Path("data")
output.mkdir(exist_ok=True)
for d in pl.date_range(
dt.date(2023, 1, 1), dt.date(2024, 1, 1), eager=True, interval="1d"
):
if d.isoweekday() in (6, 7):
continue
st = dt.datetime.combine(d, dt.time(8, 0))
et = dt.datetime.combine(d, dt.time(7, 0)) + dt.timedelta(days=1)
lf = pl.LazyFrame().select(
pl.datetime_range(st, et, dt.timedelta(seconds=1)).alias("ts")
)
lf2 = pl.LazyFrame().select(pl.int_range(1, 1000).alias("id"))
df = lf.join(lf2, how="cross")
df.collect().write_parquet(
output / f"{d}.parquet", row_group_size=100000, use_pyarrow=True
)
print(d)
- Scan over that dataset
import datetime as dt
import polars as pl
df = pl.scan_parquet("data/*.parquet", glob=True)
res = (
df.filter(pl.col("ts") > dt.datetime(2023, 2, 10), pl.col("id") == 10)
.head(10)
.collect()
)
print(res)
- Alternative scan using duckdb
import duckdb
con = duckdb.connect()
res = con.query("""
select * from
'data/*.parquet'
where id=10
and ts>'2023-02-10'
limit 10
""")
print(res)
- Alternatively using pyarrow
import datetime as dt
import pyarrow.compute as pc
import pyarrow.dataset as pds
ds = pds.dataset("data/", format="parquet")
ds = ds.filter((pc.field("ts") > dt.datetime(2023, 2, 10)) & (pc.field("id") == 10))
print(ds.head(10))
Log output
Polars:
time python parquet_test_read.py
shape: (10, 2)
┌─────────────────────┬─────┐
│ ts ┆ id │
│ --- ┆ --- │
│ datetime[μs] ┆ i64 │
╞═════════════════════╪═════╡
│ 2023-02-10 00:00:01 ┆ 10 │
│ 2023-02-10 00:00:02 ┆ 10 │
│ 2023-02-10 00:00:03 ┆ 10 │
│ 2023-02-10 00:00:04 ┆ 10 │
│ 2023-02-10 00:00:05 ┆ 10 │
│ 2023-02-10 00:00:06 ┆ 10 │
│ 2023-02-10 00:00:07 ┆ 10 │
│ 2023-02-10 00:00:08 ┆ 10 │
│ 2023-02-10 00:00:09 ┆ 10 │
│ 2023-02-10 00:00:10 ┆ 10 │
└─────────────────────┴─────┘
real 3m51.228s
user 3m44.271s
sys 3m7.796s
Duckdb:
time python parquet_test_read2.py
┌─────────────────────┬───────┐
│ ts │ id │
│ timestamp │ int64 │
├─────────────────────┼───────┤
│ 2023-02-10 00:00:01 │ 10 │
│ 2023-02-10 00:00:02 │ 10 │
│ 2023-02-10 00:00:03 │ 10 │
│ 2023-02-10 00:00:04 │ 10 │
│ 2023-02-10 00:00:05 │ 10 │
│ 2023-02-10 00:00:06 │ 10 │
│ 2023-02-10 00:00:07 │ 10 │
│ 2023-02-10 00:00:08 │ 10 │
│ 2023-02-10 00:00:09 │ 10 │
│ 2023-02-10 00:00:10 │ 10 │
├─────────────────────┴───────┤
│ 10 rows 2 columns │
└─────────────────────────────┘
real 0m0.609s
user 0m0.543s
sys 0m0.042s
Pyarrow:
time python parquet_test_read3.py
pyarrow.Table
ts: timestamp[us]
id: int64
----
ts: [[],[],...,[],[2023-02-10 00:00:01.000000,2023-02-10 00:00:02.000000,2023-02-10 00:00:03.000000,2023-02-10 00:00:04.000000,2023-02-10 00:00:05.000000,2023-02-10 00:00:06.000000,2023-02-10 00:00:07.000000,2023-02-10 00:00:08.000000,2023-02-10 00:00:09.000000,2023-02-10 00:00:10.000000]]
id: [[],[],...,[],[10,10,10,10,10,10,10,10,10,10]]
real 0m3.054s
user 0m3.854s
sys 0m0.613s
Issue description
It feels to me like scan_parquet is not considering the Parquet statistics at all; it should be able to take the provided filter and (reasonably) quickly narrow down to finding a relevant chunk. Other operations like groupby/count is also very slow.
Expected behavior
Should be able to quickly scan/read dataset based on the provided filter
Installed versions
--------Version info---------
Polars: 1.9.0
Index type: UInt32
Platform: Linux-6.8.0-45-generic-x86_64-with-glibc2.39
Python: 3.12.3 (main, Sep 11 2024, 14:17:37) [GCC 13.2.0]
----Optional dependencies----
adbc_driver_manager <not installed>
altair <not installed>
cloudpickle <not installed>
connectorx <not installed>
deltalake <not installed>
fastexcel <not installed>
fsspec <not installed>
gevent <not installed>
great_tables <not installed>
matplotlib 3.9.2
nest_asyncio 1.6.0
numpy 2.1.1
openpyxl <not installed>
pandas 2.2.3
pyarrow 17.0.0
pydantic <not installed>
pyiceberg <not installed>
sqlalchemy <not installed>
torch <not installed>
xlsx2csv <not installed>
xlsxwriter <not installed>
Seems strange, I will have a look somewhere soon.
I think this is a problem of us not doing both SLICE and PREDICATE PUSHDOWN.
fwiw if I use scan_pyarrow_dataset instead, it just consumes a bucketload of ram and eventually ooms
import datetime as dt
import polars as pl
import pyarrow.dataset as pds
ds = pds.dataset("data/", format="parquet")
df = pl.scan_pyarrow_dataset(ds)
res = (
df.filter(pl.col("ts") > dt.datetime(2023, 2, 10), pl.col("id") == 10)
.head(10)
.collect()
)
print(res)
I think this is a problem of us not doing both SLICE and PREDICATE PUSHDOWN.
Yes, we must accept a prefilter slice and a postfilter slice in the readers. Currently it's ambigious and we don't push down both. This will automatically be resolved in streaming, but we should also have this distinction for in-mem engine.
I ran into this as well, at least I suspect it's the same issue. I have an 8GB file with about 325 million rows.
lf.select(pl.count()).collect()
is very fast.
lf.select(pl.col("timestamp").min()).collect()
is pretty fast too..
lf.filter(pl.col("timestamp") == pl.col("timestamp").max()).collect()
actually OOMs on my 32GB machine
Here's the explain if that helps?
lf.filter(pl.col("timestamp") == pl.col("timestamp").max()).explain()
'FILTER [(col("timestamp")) == (col("timestamp").max())] FROM\n Parquet SCAN [redacted.pl]\n PROJECT */7 COLUMNS'
@RmStorm I don't think that is related. We simply don't filter that while reading because we cannot know the maximum while reading (although admittedly we could do some smarter stuff there). I am pretty sure your computer OOMs because DataFrame before it gets filtered is larger than 32GB.
yeah but the point is that filtering is possible because the timestamp column has min/max ranges written on all the row_groups and there is just a single row_group that contains that timestamp so only it should be loaded. My understanding of the predicate pushdown stuff is that it's kinda supposed to leverage the parquet statistics to only load the relevant row groups? Or am I misunderstanding it?
Ah, I see what you mean. Theoretically, it is possible to do what you are proposing. Not sure about non-integer columns, since the min and max there might be different from how we define them. I can maybe look into it, later.
Still not really related to this issue, I will open a separate issue.