polars icon indicating copy to clipboard operation
polars copied to clipboard

Can't `sink_parquet` on a sorted LazyFrame containing decimal columns

Open edthrn opened this issue 1 year ago • 2 comments
trafficstars

Checks

  • [X] I have checked that this issue has not already been reported.
  • [X] I have confirmed this bug exists on the latest version of Polars.

Reproducible example

Given a very large data set (1b rows) stored on S3:

This works good:

# Works as expected
pl.scan_parquet(
    "s3://.../my_dataset/**/*.parquet"
).filter(
    pl.col("date") < datetime.now() - timedelta(days=120),
).sink_parquet(
    "/tmp/data.parquet"
)

But this doesn't:

# Does not work
pl.scan_parquet(
    "s3://.../my_dataset/**/*.parquet"
).filter(
    pl.col("date") < datetime.now() - timedelta(days=120),
).sort(
    pl.col("value")
).sink_parquet(
    "/tmp/data.parquet"
)

I get the following error:

Log output

POLARS PREFETCH_SIZE: 64
RUN STREAMING PIPELINE
[parquet -> sort -> parquet_sink]
STREAMING CHUNK SIZE: 1388 rows
STREAMING CHUNK SIZE: 1388 rows
...
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
OOC sort started
Temporary directory path in use: /tmp
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
parquet file must be read, statistics not sufficient for predicate.
OOC sort started
OOC sort started
...
OOC sort started
thread '<unnamed>' panicked at crates/polars-core/src/series/series_trait.rs:234:9:
`shrink_to_fit` operation not supported for dtype `decimal[22,10]`
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'thread 'thread 'polars-9polars-27polars-1' panicked at ' panicked at ' panicked at crates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rs:::271271271:::4949:
49called `Result::unwrap()` on an `Err` value: "SendError(..)":
:
called `Result::unwrap()` on an `Err` value: "SendError(..)"

called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-16' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-14' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-10' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-8' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-24' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-25' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-15' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'thread 'polars-23polars-29' panicked at ' panicked at crates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rs::271271::4949:
:
called `Result::unwrap()` on an `Err` value: "SendError(..)"called `Result::unwrap()` on an `Err` value: "SendError(..)"

thread 'polars-3' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-7' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-30' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-12' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-26' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-4' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-21' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-18' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-11' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-13' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"

---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
Cell In[7], line 9
      1 pl.scan_parquet(
      2     's3://.../*.parquet',
      3     storage_options=options
      4 ).filter(
      5     pl.col("date") < datetime.now() - timedelta(days=120)
      7 ).sort(
      8     pl.col("value")
----> 9 ).sink_parquet(
     10     '/tmp/data.parquet',
     11 )

File ~/venv/lib/python3.11/site-packages/polars/_utils/unstable.py:58, in unstable.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
     55 @wraps(function)
     56 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
     57     issue_unstable_warning(f"`{function.__name__}` is considered unstable.")
---> 58     return function(*args, **kwargs)

File ~/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:2233, in LazyFrame.sink_parquet(self, path, compression, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, no_optimization)
   2225 elif statistics == "full":
   2226     statistics = {
   2227         "min": True,
   2228         "max": True,
   2229         "distinct_count": True,
   2230         "null_count": True,
   2231     }
-> 2233 return lf.sink_parquet(
   2234     path=normalize_filepath(path),
   2235     compression=compression,
   2236     compression_level=compression_level,
   2237     statistics=statistics,
   2238     row_group_size=row_group_size,
   2239     data_pagesize_limit=data_pagesize_limit,
   2240     maintain_order=maintain_order,
   2241 )

PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"

Issue description

I stumbled upon #16603 and tried the POLARS_ACTIVATE_DECIMAL=1 hack.

It was necessary for the first (unsorted) sample code to work, but it is apparently not sufficient for the sorted code sample to work.

I tested with both versions 0.20.31 and 1.0.0rc2: Same results.

Expected behavior

I expected the lazy scan/filter/sort/sink to work as good as scan/filter/sink.

Installed versions

--------Version info---------
Polars:               0.20.31
Index type:           UInt32
Platform:             Linux-4.19.0-27-cloud-amd64-x86_64-with-glibc2.28
Python:               3.11.0 (main, May 15 2024, 19:44:29) [GCC 8.3.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               2024.6.1
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         1.6.0
numpy:                2.0.0
openpyxl:             <not installed>
pandas:               2.2.2
pyarrow:              16.1.0
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           2.0.31
torch:                <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>

edthrn avatar Jun 29 '24 11:06 edthrn

I just tested with a smaller dataset, ie instead of scanning all ~17k files, I only scan the first 50...

And it works :thinking:

Does it mean that the problem comes from the data itself (eg, a null value or something similar)? In that case, it's still odd that the unsorted version works as expected...

edthrn avatar Jun 29 '24 11:06 edthrn

I managed to scan/filter/sort/sink the whole dataset by processing it by batches of 500 source files.

for i, batch in enumerate(batched(s3_urls, batch_size=500)):
    pl.scan_parquet(
        batch,
    ).filter(
        pl.col("date") < datetime.now() - timedelta(days=120)
    ).sort(
        pl.col("value")
    ).sink_parquet(
        f'/tmp/data_{i}.parquet',
    )

Hence the supposition I gave above can be ruled out: it's not a data value/data type problem.


Minor problem now: I now have 34 parquet files at the end of the process (knowing that I have 17k source files in total), instead of a single large one.

edthrn avatar Jun 29 '24 13:06 edthrn

I'm hitting something similar after upgrading to Polars v1.0.0 (note: I am using polars-u64-idx)

thread '<unnamed>' panicked at crates/polars-core/src/series/series_trait.rs:234:9:
`shrink_to_fit` operation not supported for dtype `decimal[15,2]`
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-4' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-3' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"thread 'polars-6
' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-8' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-9' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-10' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
Cell In[7], [line 5](vscode-notebook-cell:?execution_count=7&line=5)
      [1](vscode-notebook-cell:?execution_count=7&line=1) df = pl.scan_parquet(data)
      [2](vscode-notebook-cell:?execution_count=7&line=2) (
      [3](vscode-notebook-cell:?execution_count=7&line=3)     df.sort(pl.col("l_orderkey"), pl.col("l_partkey"), pl.col("l_suppkey"))
      [4](vscode-notebook-cell:?execution_count=7&line=4)     .head(3)
----> [5](vscode-notebook-cell:?execution_count=7&line=5)     .collect(streaming=True)
      [6](vscode-notebook-cell:?execution_count=7&line=6) )

File ~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1942, in LazyFrame.collect(self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, cluster_with_columns, no_optimization, streaming, background, _eager, **_kwargs)
   [1939](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1939) # Only for testing purposes atm.
   [1940](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1940) callback = _kwargs.get("post_opt_callback")
-> [1942](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1942) return wrap_df(ldf.collect(callback))

PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"

from:

df = pl.scan_parquet(data)
(
    df.sort(pl.col("l_orderkey"), pl.col("l_partkey"), pl.col("l_suppkey"))
    .head(3)
    .collect(streaming=True)
)

where data points to ~275GB of Parquet files

interestingly before upgrading I was hitting #17281 on this operation

lostmygithubaccount avatar Jul 01 '24 15:07 lostmygithubaccount

I confirm that I still get the issue after upgrading to v1.0.0

edthrn avatar Jul 03 '24 11:07 edthrn

Is this still a problem? I don't believe that panic can still occur.

orlp avatar Sep 19 '25 12:09 orlp