polars
polars copied to clipboard
Can't `sink_parquet` on a sorted LazyFrame containing decimal columns
Checks
- [X] I have checked that this issue has not already been reported.
- [X] I have confirmed this bug exists on the latest version of Polars.
Reproducible example
Given a very large data set (1b rows) stored on S3:
This works good:
# Works as expected
pl.scan_parquet(
"s3://.../my_dataset/**/*.parquet"
).filter(
pl.col("date") < datetime.now() - timedelta(days=120),
).sink_parquet(
"/tmp/data.parquet"
)
But this doesn't:
# Does not work
pl.scan_parquet(
"s3://.../my_dataset/**/*.parquet"
).filter(
pl.col("date") < datetime.now() - timedelta(days=120),
).sort(
pl.col("value")
).sink_parquet(
"/tmp/data.parquet"
)
I get the following error:
Log output
POLARS PREFETCH_SIZE: 64
RUN STREAMING PIPELINE
[parquet -> sort -> parquet_sink]
STREAMING CHUNK SIZE: 1388 rows
STREAMING CHUNK SIZE: 1388 rows
...
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
OOC sort started
Temporary directory path in use: /tmp
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
parquet file must be read, statistics not sufficient for predicate.
OOC sort started
OOC sort started
...
OOC sort started
thread '<unnamed>' panicked at crates/polars-core/src/series/series_trait.rs:234:9:
`shrink_to_fit` operation not supported for dtype `decimal[22,10]`
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'thread 'thread 'polars-9polars-27polars-1' panicked at ' panicked at ' panicked at crates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rs:::271271271:::4949:
49called `Result::unwrap()` on an `Err` value: "SendError(..)":
:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-16' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-14' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-10' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-8' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-24' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-25' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-15' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'thread 'polars-23polars-29' panicked at ' panicked at crates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rs::271271::4949:
:
called `Result::unwrap()` on an `Err` value: "SendError(..)"called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-3' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-7' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-30' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-12' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-26' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-4' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-21' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-18' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-11' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-13' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
---------------------------------------------------------------------------
PanicException Traceback (most recent call last)
Cell In[7], line 9
1 pl.scan_parquet(
2 's3://.../*.parquet',
3 storage_options=options
4 ).filter(
5 pl.col("date") < datetime.now() - timedelta(days=120)
7 ).sort(
8 pl.col("value")
----> 9 ).sink_parquet(
10 '/tmp/data.parquet',
11 )
File ~/venv/lib/python3.11/site-packages/polars/_utils/unstable.py:58, in unstable.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
55 @wraps(function)
56 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
57 issue_unstable_warning(f"`{function.__name__}` is considered unstable.")
---> 58 return function(*args, **kwargs)
File ~/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:2233, in LazyFrame.sink_parquet(self, path, compression, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, no_optimization)
2225 elif statistics == "full":
2226 statistics = {
2227 "min": True,
2228 "max": True,
2229 "distinct_count": True,
2230 "null_count": True,
2231 }
-> 2233 return lf.sink_parquet(
2234 path=normalize_filepath(path),
2235 compression=compression,
2236 compression_level=compression_level,
2237 statistics=statistics,
2238 row_group_size=row_group_size,
2239 data_pagesize_limit=data_pagesize_limit,
2240 maintain_order=maintain_order,
2241 )
PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"
Issue description
I stumbled upon #16603 and tried the POLARS_ACTIVATE_DECIMAL=1 hack.
It was necessary for the first (unsorted) sample code to work, but it is apparently not sufficient for the sorted code sample to work.
I tested with both versions 0.20.31 and 1.0.0rc2: Same results.
Expected behavior
I expected the lazy scan/filter/sort/sink to work as good as scan/filter/sink.
Installed versions
--------Version info---------
Polars: 0.20.31
Index type: UInt32
Platform: Linux-4.19.0-27-cloud-amd64-x86_64-with-glibc2.28
Python: 3.11.0 (main, May 15 2024, 19:44:29) [GCC 8.3.0]
----Optional dependencies----
adbc_driver_manager: <not installed>
cloudpickle: <not installed>
connectorx: <not installed>
deltalake: <not installed>
fastexcel: <not installed>
fsspec: 2024.6.1
gevent: <not installed>
hvplot: <not installed>
matplotlib: <not installed>
nest_asyncio: 1.6.0
numpy: 2.0.0
openpyxl: <not installed>
pandas: 2.2.2
pyarrow: 16.1.0
pydantic: <not installed>
pyiceberg: <not installed>
pyxlsb: <not installed>
sqlalchemy: 2.0.31
torch: <not installed>
xlsx2csv: <not installed>
xlsxwriter: <not installed>
I just tested with a smaller dataset, ie instead of scanning all ~17k files, I only scan the first 50...
And it works :thinking:
Does it mean that the problem comes from the data itself (eg, a null value or something similar)? In that case, it's still odd that the unsorted version works as expected...
I managed to scan/filter/sort/sink the whole dataset by processing it by batches of 500 source files.
for i, batch in enumerate(batched(s3_urls, batch_size=500)):
pl.scan_parquet(
batch,
).filter(
pl.col("date") < datetime.now() - timedelta(days=120)
).sort(
pl.col("value")
).sink_parquet(
f'/tmp/data_{i}.parquet',
)
Hence the supposition I gave above can be ruled out: it's not a data value/data type problem.
Minor problem now: I now have 34 parquet files at the end of the process (knowing that I have 17k source files in total), instead of a single large one.
I'm hitting something similar after upgrading to Polars v1.0.0 (note: I am using polars-u64-idx)
thread '<unnamed>' panicked at crates/polars-core/src/series/series_trait.rs:234:9:
`shrink_to_fit` operation not supported for dtype `decimal[15,2]`
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-4' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-3' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"thread 'polars-6
' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-8' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-9' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-10' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
---------------------------------------------------------------------------
PanicException Traceback (most recent call last)
Cell In[7], [line 5](vscode-notebook-cell:?execution_count=7&line=5)
[1](vscode-notebook-cell:?execution_count=7&line=1) df = pl.scan_parquet(data)
[2](vscode-notebook-cell:?execution_count=7&line=2) (
[3](vscode-notebook-cell:?execution_count=7&line=3) df.sort(pl.col("l_orderkey"), pl.col("l_partkey"), pl.col("l_suppkey"))
[4](vscode-notebook-cell:?execution_count=7&line=4) .head(3)
----> [5](vscode-notebook-cell:?execution_count=7&line=5) .collect(streaming=True)
[6](vscode-notebook-cell:?execution_count=7&line=6) )
File ~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1942, in LazyFrame.collect(self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, cluster_with_columns, no_optimization, streaming, background, _eager, **_kwargs)
[1939](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1939) # Only for testing purposes atm.
[1940](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1940) callback = _kwargs.get("post_opt_callback")
-> [1942](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1942) return wrap_df(ldf.collect(callback))
PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"
from:
df = pl.scan_parquet(data)
(
df.sort(pl.col("l_orderkey"), pl.col("l_partkey"), pl.col("l_suppkey"))
.head(3)
.collect(streaming=True)
)
where data points to ~275GB of Parquet files
interestingly before upgrading I was hitting #17281 on this operation
I confirm that I still get the issue after upgrading to v1.0.0
Is this still a problem? I don't believe that panic can still occur.