polars icon indicating copy to clipboard operation
polars copied to clipboard

`sink_parquet(...)` much slower than `.collect().write_parquet(...)`

Open jonashaag opened this issue 1 year ago • 13 comments

Checks

  • [X] I have checked that this issue has not already been reported.

  • [X] I have confirmed this bug exists on the latest version of Polars.

Reproducible example

Using the same data as in https://github.com/pola-rs/polars/issues/11698, I'm observing at least a 2x performance difference in the following:

pl.scan_parquet(f).sink_parquet("/tmp/out")

# 2x faster:
pl.scan_parquet(f).collect().write_parquet("/tmp/out")

Interestingly when looking at htop, the version with .collect() seems to use all cores sometimes (during reading?) while the other version never uses more than ~ 1 core.

This is a reduced example; the original tables has many more columns and the performance difference is much larger.

Log output

No response

Issue description

See example

Expected behavior

Should have similar performance.

Installed versions

--------Version info---------
Polars:              0.19.8
Index type:          UInt32
Platform:            macOS-13.5.2-arm64-arm-64bit
Python:              3.11.4 | packaged by conda-forge | (main, Jun 10 2023, 18:08:41) [Clang 15.0.7 ]

----Optional dependencies----
adbc_driver_sqlite:  <not installed>
cloudpickle:         <not installed>
connectorx:          <not installed>
deltalake:           <not installed>
fsspec:              <not installed>
gevent:              <not installed>
matplotlib:          <not installed>
numpy:               1.25.0
openpyxl:            <not installed>
pandas:              2.0.3
pyarrow:             13.0.0
pydantic:            <not installed>
pyiceberg:           <not installed>
pyxlsb:              <not installed>
sqlalchemy:          <not installed>
xlsx2csv:            <not installed>
xlsxwriter:          <not installed>

jonashaag avatar Oct 12 '23 17:10 jonashaag

Might be related to https://github.com/pola-rs/polars/issues/9756 *Although I couldn't reproduce

nameexhaustion avatar Oct 13 '23 01:10 nameexhaustion

I cannot reproduce? image

ritchie46 avatar Oct 13 '23 12:10 ritchie46

Weird...

In [27]: %timeit -n1 -r3 q.sink_parquet("/tmp/t")
1.8 s ± 13.5 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

In [29]: %timeit -n1 -r3 q.collect().write_parquet("/tmp/t")
1.55 s ± 10.8 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

Although on this machine (macOS M2) it's only a 1.2x slowdown, on the other machine (Linux, Xeon with AVX 512) it was more like 2x.

jonashaag avatar Oct 14 '23 16:10 jonashaag

2 core GitHub codespace:

In [1]: import polars as pl

In [2]: q = pl.scan_parquet("pl-perf-medium.parquet")

In [3]: %timeit -n1 -r3 q.sink_parquet("/tmp/t")
3.59 s ± 113 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

In [4]: %timeit -n1 -r3 q.collect().write_parquet("/tmp/t")
4 s ± 100 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

16 core GitHub codespace:

In [3]: %timeit -n1 -r3 q.sink_parquet("/tmp/t")
2.82 s ± 8.88 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

In [4]: %timeit -n1 -r3 q.collect().write_parquet("/tmp/t")
2.39 s ± 24.7 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

So seems to be related to the number of threads

jonashaag avatar Oct 14 '23 16:10 jonashaag

Sure it is fastest with more tests. But you still see samish performance within the same thread count.

ritchie46 avatar Oct 18 '23 08:10 ritchie46

This is on the production machine, it's a larger file with 2x the rows

Screenshot 2023-10-20 at 15 43 52

jonashaag avatar Oct 20 '23 12:10 jonashaag

Similar behavior with a different Parquet and this code:

pl.scan_parquet(...).filter(...).sink_parquet(...)

Uses only 2 cores.

pl.scan_parquet(...).filter(...).collect()

Uses all cores. But a df.write_parquet(...) seems bottlenecked to 1 core. I wasn't aware of that being the case and I thought I'd observed multiple cores being used in write_parquet() at some point?!

jonashaag avatar Nov 09 '23 21:11 jonashaag

Is write_parquet meant to be multicore, as it only creates a single file?

thetkoseek avatar Dec 19 '23 06:12 thetkoseek

Yes it is

jonashaag avatar Dec 19 '23 06:12 jonashaag

I've also found sink_parquet to be very slow. This query takes about 1 s to evaluate but if I run sink_parquet at the end I have to kill the kernel after a few minutes when the file size has only got to 0.5 Mb.

from datetime import datetime

import polars as pl

start_datetime = datetime(2020,1,1)
end_datetime = datetime(2024,1,1,)
N_ids = 10
interval = "1m"
df = pl.LazyFrame(
    {
        "time":pl.datetime_range(start_datetime,end_datetime,interval=interval,eager=True)
    }
).join(
    pl.LazyFrame(
        {
            "id":[f"id{i:05d}" for i in range(N_ids)],
            "value":list(range(N_ids))
        }
    ),how="cross"
).select("time","id","value")

If I do df.collect() with or without streaming it takes about a second to run. If I do df.sink_parquet it sinks my kernel

braaannigan avatar Jan 11 '24 11:01 braaannigan

Update this alternative approach has been running for about 10 minutes and has only written 10Mb of data so far

df.collect(streaming=True).write_parquet("large.parquet")

braaannigan avatar Jan 11 '24 12:01 braaannigan

@braaannigan I am running into a similar issue. The idea was to read many files from a Cloud storage and merge them without using too much memory. Once I figured out that the problem had nothing to do with anything cloud-related, it boiled down to a repro like this:

CleanShot 2024-01-11 at 17 10 56@2x

Also observe the different sizes of the Parquet files.

aberres avatar Jan 11 '24 16:01 aberres

Oh, one more thing: My data is nicely compressible. Using use_pyarrow=True brings the Parquet size down to 5Mb.

aberres avatar Jan 11 '24 16:01 aberres

Going to try to figure out a fix for this.

itamarst avatar Feb 01 '24 16:02 itamarst

@itamarst thank you for looking into this! Were you able to reproduce the fact that the sink_parquet() route uses fewer cores? I think that explains most of the performance difference

jonashaag avatar Feb 03 '24 18:02 jonashaag

@itamarst

In [10]: %timeit -n1 -r1 pl.scan_parquet("tmp/pl-perf-medium.parquet").sink_parquet("/tmp/xx", compression="uncompressed")
2.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

In [11]: %timeit -n1 -r1 pl.read_parquet("tmp/pl-perf-medium.parquet").write_parquet("/tmp/xx", compression="uncompressed")
1.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

In [13]: %timeit -n1 -r1 pl.scan_parquet("tmp/pl-perf-medium.parquet").sink_parquet("/tmp/xx", compression="zstd")
2.97 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

In [12]: %timeit -n1 -r1 pl.read_parquet("tmp/pl-perf-medium.parquet").write_parquet("/tmp/xx", compression="zstd")
1.28 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

What's interesting is that the faster version uses 13 threads (I ran the benchmarks 3x here):

Screenshot 2024-02-03 at 20 00 49

While the slower one uses 21 threads (all of the threads from the image above plus some more)

Screenshot 2024-02-03 at 20 00 54 Screenshot 2024-02-03 at 20 01 01

My machine has 12 cores

jonashaag avatar Feb 03 '24 19:02 jonashaag

Oh huh I didn't notice that part, no, I found some other (perhaps related) issue where the batch sizes of writes seem tiny.

itamarst avatar Feb 03 '24 19:02 itamarst

That would explain the very sparse-looking thread utilization graphs for the sink_parquet() variant

jonashaag avatar Feb 03 '24 21:02 jonashaag

So I measured batch size and it's 2.5K rows written at a time with sink_parquet(), 256K rows written at a time with write_parquet() from a dataframe. The result is also visible in file sizes, with the former being 30MB and the latter 27MB.

I imagine the impacts are also pretty bad when writing over the network, because network latency becomes a bottleneck...

As I see it, there's two sides to the problem:

  1. Generated batch sizes: Something somewhere is producing too few rows per batch. While this should be fixed, my guess is that this isn't solvable in the general case, due to e.g. filtering. I could be wrong and this is solvable in general.
  2. Written batch sizes: The writing code is happily writing overly small batches. Instead, it could merge batches that are too small before writing them.

Solving one would make solving the other less important. However, given the first part is more complex, I will see if the second part is something I can do.

itamarst avatar Feb 05 '24 14:02 itamarst

@itamarst

In [10]: %timeit -n1 -r1 pl.scan_parquet("tmp/pl-perf-medium.parquet").sink_parquet("/tmp/xx", compression="uncompressed")
2.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

In [11]: %timeit -n1 -r1 pl.read_parquet("tmp/pl-perf-medium.parquet").write_parquet("/tmp/xx", compression="uncompressed")
1.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

In [13]: %timeit -n1 -r1 pl.scan_parquet("tmp/pl-perf-medium.parquet").sink_parquet("/tmp/xx", compression="zstd")
2.97 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

In [12]: %timeit -n1 -r1 pl.read_parquet("tmp/pl-perf-medium.parquet").write_parquet("/tmp/xx", compression="zstd")
1.28 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

What's interesting is that the faster version uses 13 threads (I ran the benchmarks 3x here):

Screenshot 2024-02-03 at 20 00 49 While the slower one uses 21 threads (all of the threads from the image above plus some more)

Screenshot 2024-02-03 at 20 00 54 Screenshot 2024-02-03 at 20 01 01 My machine has 12 cores

Not directly related but is this a vs code plugin ? How did you get these metrics ?

hugopendlebury avatar Feb 06 '24 06:02 hugopendlebury

@hugopendlebury it is macOS Instruments (comes with Xcode). It is pretty unknown outside of iOS app development but it is actually a fantastic performance monitoring tool. Probably the best performance monitoring tool that has a GUI in the world honestly

jonashaag avatar Feb 06 '24 07:02 jonashaag