polars
polars copied to clipboard
`sink_parquet(...)` much slower than `.collect().write_parquet(...)`
Checks
-
[X] I have checked that this issue has not already been reported.
-
[X] I have confirmed this bug exists on the latest version of Polars.
Reproducible example
Using the same data as in https://github.com/pola-rs/polars/issues/11698, I'm observing at least a 2x performance difference in the following:
pl.scan_parquet(f).sink_parquet("/tmp/out")
# 2x faster:
pl.scan_parquet(f).collect().write_parquet("/tmp/out")
Interestingly when looking at htop, the version with .collect()
seems to use all cores sometimes (during reading?) while the other version never uses more than ~ 1 core.
This is a reduced example; the original tables has many more columns and the performance difference is much larger.
Log output
No response
Issue description
See example
Expected behavior
Should have similar performance.
Installed versions
--------Version info---------
Polars: 0.19.8
Index type: UInt32
Platform: macOS-13.5.2-arm64-arm-64bit
Python: 3.11.4 | packaged by conda-forge | (main, Jun 10 2023, 18:08:41) [Clang 15.0.7 ]
----Optional dependencies----
adbc_driver_sqlite: <not installed>
cloudpickle: <not installed>
connectorx: <not installed>
deltalake: <not installed>
fsspec: <not installed>
gevent: <not installed>
matplotlib: <not installed>
numpy: 1.25.0
openpyxl: <not installed>
pandas: 2.0.3
pyarrow: 13.0.0
pydantic: <not installed>
pyiceberg: <not installed>
pyxlsb: <not installed>
sqlalchemy: <not installed>
xlsx2csv: <not installed>
xlsxwriter: <not installed>
Might be related to https://github.com/pola-rs/polars/issues/9756 *Although I couldn't reproduce
I cannot reproduce?
Weird...
In [27]: %timeit -n1 -r3 q.sink_parquet("/tmp/t")
1.8 s ± 13.5 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
In [29]: %timeit -n1 -r3 q.collect().write_parquet("/tmp/t")
1.55 s ± 10.8 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
Although on this machine (macOS M2) it's only a 1.2x slowdown, on the other machine (Linux, Xeon with AVX 512) it was more like 2x.
2 core GitHub codespace:
In [1]: import polars as pl
In [2]: q = pl.scan_parquet("pl-perf-medium.parquet")
In [3]: %timeit -n1 -r3 q.sink_parquet("/tmp/t")
3.59 s ± 113 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
In [4]: %timeit -n1 -r3 q.collect().write_parquet("/tmp/t")
4 s ± 100 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
16 core GitHub codespace:
In [3]: %timeit -n1 -r3 q.sink_parquet("/tmp/t")
2.82 s ± 8.88 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
In [4]: %timeit -n1 -r3 q.collect().write_parquet("/tmp/t")
2.39 s ± 24.7 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
So seems to be related to the number of threads
Sure it is fastest with more tests. But you still see samish performance within the same thread count.
This is on the production machine, it's a larger file with 2x the rows
Similar behavior with a different Parquet and this code:
pl.scan_parquet(...).filter(...).sink_parquet(...)
Uses only 2 cores.
pl.scan_parquet(...).filter(...).collect()
Uses all cores. But a df.write_parquet(...)
seems bottlenecked to 1 core. I wasn't aware of that being the case and I thought I'd observed multiple cores being used in write_parquet()
at some point?!
Is write_parquet meant to be multicore, as it only creates a single file?
Yes it is
I've also found sink_parquet to be very slow. This query takes about 1 s to evaluate but if I run sink_parquet
at the end I have to kill the kernel after a few minutes when the file size has only got to 0.5 Mb.
from datetime import datetime
import polars as pl
start_datetime = datetime(2020,1,1)
end_datetime = datetime(2024,1,1,)
N_ids = 10
interval = "1m"
df = pl.LazyFrame(
{
"time":pl.datetime_range(start_datetime,end_datetime,interval=interval,eager=True)
}
).join(
pl.LazyFrame(
{
"id":[f"id{i:05d}" for i in range(N_ids)],
"value":list(range(N_ids))
}
),how="cross"
).select("time","id","value")
If I do df.collect()
with or without streaming it takes about a second to run. If I do df.sink_parquet it sinks my kernel
Update this alternative approach has been running for about 10 minutes and has only written 10Mb of data so far
df.collect(streaming=True).write_parquet("large.parquet")
@braaannigan I am running into a similar issue. The idea was to read many files from a Cloud storage and merge them without using too much memory. Once I figured out that the problem had nothing to do with anything cloud-related, it boiled down to a repro like this:
Also observe the different sizes of the Parquet files.
Oh, one more thing: My data is nicely compressible. Using use_pyarrow=True
brings the Parquet size down to 5Mb.
Going to try to figure out a fix for this.
@itamarst thank you for looking into this! Were you able to reproduce the fact that the sink_parquet()
route uses fewer cores? I think that explains most of the performance difference
@itamarst
In [10]: %timeit -n1 -r1 pl.scan_parquet("tmp/pl-perf-medium.parquet").sink_parquet("/tmp/xx", compression="uncompressed")
2.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
In [11]: %timeit -n1 -r1 pl.read_parquet("tmp/pl-perf-medium.parquet").write_parquet("/tmp/xx", compression="uncompressed")
1.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
In [13]: %timeit -n1 -r1 pl.scan_parquet("tmp/pl-perf-medium.parquet").sink_parquet("/tmp/xx", compression="zstd")
2.97 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
In [12]: %timeit -n1 -r1 pl.read_parquet("tmp/pl-perf-medium.parquet").write_parquet("/tmp/xx", compression="zstd")
1.28 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
What's interesting is that the faster version uses 13 threads (I ran the benchmarks 3x here):
While the slower one uses 21 threads (all of the threads from the image above plus some more)
My machine has 12 cores
Oh huh I didn't notice that part, no, I found some other (perhaps related) issue where the batch sizes of writes seem tiny.
That would explain the very sparse-looking thread utilization graphs for the sink_parquet()
variant
So I measured batch size and it's 2.5K rows written at a time with sink_parquet(), 256K rows written at a time with write_parquet() from a dataframe. The result is also visible in file sizes, with the former being 30MB and the latter 27MB.
I imagine the impacts are also pretty bad when writing over the network, because network latency becomes a bottleneck...
As I see it, there's two sides to the problem:
- Generated batch sizes: Something somewhere is producing too few rows per batch. While this should be fixed, my guess is that this isn't solvable in the general case, due to e.g. filtering. I could be wrong and this is solvable in general.
- Written batch sizes: The writing code is happily writing overly small batches. Instead, it could merge batches that are too small before writing them.
Solving one would make solving the other less important. However, given the first part is more complex, I will see if the second part is something I can do.
@itamarst
In [10]: %timeit -n1 -r1 pl.scan_parquet("tmp/pl-perf-medium.parquet").sink_parquet("/tmp/xx", compression="uncompressed") 2.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each) In [11]: %timeit -n1 -r1 pl.read_parquet("tmp/pl-perf-medium.parquet").write_parquet("/tmp/xx", compression="uncompressed") 1.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each) In [13]: %timeit -n1 -r1 pl.scan_parquet("tmp/pl-perf-medium.parquet").sink_parquet("/tmp/xx", compression="zstd") 2.97 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each) In [12]: %timeit -n1 -r1 pl.read_parquet("tmp/pl-perf-medium.parquet").write_parquet("/tmp/xx", compression="zstd") 1.28 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
What's interesting is that the faster version uses 13 threads (I ran the benchmarks 3x here):
While the slower one uses 21 threads (all of the threads from the image above plus some more)
![]()
My machine has 12 cores
Not directly related but is this a vs code plugin ? How did you get these metrics ?
@hugopendlebury it is macOS Instruments (comes with Xcode). It is pretty unknown outside of iOS app development but it is actually a fantastic performance monitoring tool. Probably the best performance monitoring tool that has a GUI in the world honestly