Fix constant window for evaluate stateful
Which issue does this PR close?
- Closes #16308.
Rationale for this change
I'm not very familiar with Comet, and I'm unsure how to verify this issue. So I tried checking the code.
When both aggregate_evaluate_stateful and the optimization from https://github.com/apache/datafusion/pull/16234 are present, it leads to inconsistent return values compared to the original behavior. (However, this doesn't seem to occur in DataFusion's default planning.)
So based on this fix, I created a test, and on the main branch, this error is consistent with #16308:
attempt to subtract with overflow
stack backtrace:
0: __rustc::rust_begin_unwind
at /rustc/17067e9ac6d7ecb70e50f92c1944e545188d2359/library/std/src/panicking.rs:697:5
1: core::panicking::panic_fmt
at /rustc/17067e9ac6d7ecb70e50f92c1944e545188d2359/library/core/src/panicking.rs:75:14
2: core::panicking::panic_const::panic_const_sub_overflow
at /rustc/17067e9ac6d7ecb70e50f92c1944e545188d2359/library/core/src/panicking.rs:178:21
3: datafusion_expr::window_state::WindowAggState::update
at /home/suibianwanwan/code_repository/datafusion/datafusion/expr/src/window_state.rs:95:13
4: datafusion_physical_expr::window::window_expr::AggregateWindowExpr::aggregate_evaluate_stateful
at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-expr/src/window/window_expr.rs:259:13
5: <datafusion_physical_expr::window::aggregate::PlainAggregateWindowExpr as datafusion_physical_expr::window::window_expr::WindowExpr>::evaluate_stateful
at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-expr/src/window/aggregate.rs:149:9
6: datafusion_physical_plan::windows::bounded_window_agg_exec::BoundedWindowAggStream::compute_aggregates
at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:986:13
7: datafusion_physical_plan::windows::bounded_window_agg_exec::BoundedWindowAggStream::poll_next_inner
at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:1049:38
8: <datafusion_physical_plan::windows::bounded_window_agg_exec::BoundedWindowAggStream as futures_core::stream::Stream>::poll_next
at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:952:20
9: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
at /home/suibianwanwan/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:130:9
10: <S as futures_core::stream::TryStream>::try_poll_next
at /home/suibianwanwan/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:206:9
11: <futures_util::stream::try_stream::try_collect::TryCollect<St,C> as core::future::future::Future>::poll
at /home/suibianwanwan/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/try_stream/try_collect.rs:46:26
FYI, @alamb @andygrove.
I'm not fully familiar with testing beyond logicaltests in Datafusion. So if there's a better way to handle this, feel free to let me know~ Thanks!
Thanks @suibianwanwank - I think it would be great if we could use .slt tests to write a reproducer
Here are the instructions: https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest
Ideally you should be able to extend one of the existing test files in https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest/test_files
If you can't find something suitable I can try and find time to help over the next few days
If you can't find something suitable I can try and find time to help over the next few days
Thanks @alamb, as mentioned, the PhysicalPlan generated by default planning can't reproduce this issue, so I need to construct it manually.
I tried making a reproducer but I could not reproduce the wrong results or panic reported in @andygrove 's comment https://github.com/apache/datafusion/issues/16308#issuecomment-2949516445:
Here is what I tried:
Data: tenk.csv
Repro
create external table tenk1
(
unique1 int,
unique2 int,
two int,
four int,
ten int,
twenty int,
hundred int,
thousand int,
twothousand int,
fivethous int,
tenthous int,
odd int,
even int,
stringu1 string,
stringu2 string,
string4 string
)
stored as CSV location 'tenk.csv'
OPTIONS('has_header' 'false','format.delimiter' 9);
SELECT * from tenk1 limit 10;
SELECT COUNT(*) OVER () FROM tenk1 WHERE unique2 < 10
But that seems to work just fine:
(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Downloads$ datafusion-cli -f repro.sql
DataFusion CLI v48.0.0
0 row(s) fetched.
Elapsed 0.001 seconds.
+---------+---------+-----+------+-----+--------+---------+----------+-------------+-----------+----------+-----+------+----------+----------+---------+
| unique1 | unique2 | two | four | ten | twenty | hundred | thousand | twothousand | fivethous | tenthous | odd | even | stringu1 | stringu2 | string4 |
+---------+---------+-----+------+-----+--------+---------+----------+-------------+-----------+----------+-----+------+----------+----------+---------+
| 8800 | 0 | 0 | 0 | 0 | 0 | 0 | 800 | 800 | 3800 | 8800 | 0 | 1 | MAAAAA | AAAAAA | AAAAxx |
| 1891 | 1 | 1 | 3 | 1 | 11 | 91 | 891 | 1891 | 1891 | 1891 | 182 | 183 | TUAAAA | BAAAAA | HHHHxx |
| 3420 | 2 | 0 | 0 | 0 | 0 | 20 | 420 | 1420 | 3420 | 3420 | 40 | 41 | OBAAAA | CAAAAA | OOOOxx |
| 9850 | 3 | 0 | 2 | 0 | 10 | 50 | 850 | 1850 | 4850 | 9850 | 100 | 101 | WOAAAA | DAAAAA | VVVVxx |
| 7164 | 4 | 0 | 0 | 4 | 4 | 64 | 164 | 1164 | 2164 | 7164 | 128 | 129 | OPAAAA | EAAAAA | AAAAxx |
| 8009 | 5 | 1 | 1 | 9 | 9 | 9 | 9 | 9 | 3009 | 8009 | 18 | 19 | BWAAAA | FAAAAA | HHHHxx |
| 5057 | 6 | 1 | 1 | 7 | 17 | 57 | 57 | 1057 | 57 | 5057 | 114 | 115 | NMAAAA | GAAAAA | OOOOxx |
| 6701 | 7 | 1 | 1 | 1 | 1 | 1 | 701 | 701 | 1701 | 6701 | 2 | 3 | TXAAAA | HAAAAA | VVVVxx |
| 4321 | 8 | 1 | 1 | 1 | 1 | 21 | 321 | 321 | 4321 | 4321 | 42 | 43 | FKAAAA | IAAAAA | AAAAxx |
| 3043 | 9 | 1 | 3 | 3 | 3 | 43 | 43 | 1043 | 3043 | 3043 | 86 | 87 | BNAAAA | JAAAAA | HHHHxx |
+---------+---------+-----+------+-----+--------+---------+----------+-------------+-----------+----------+-----+------+----------+----------+---------+
10 row(s) fetched.
Elapsed 0.007 seconds.
+-------------------------------------------------------------------+
| count(*) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING |
+-------------------------------------------------------------------+
| 10 |
| 10 |
| 10 |
| 10 |
| 10 |
| 10 |
| 10 |
| 10 |
| 10 |
| 10 |
+-------------------------------------------------------------------+
10 row(s) fetched.
Elapsed 0.004 seconds.
Notes for myself of where this came from:
https://github.com/apache/spark/blob/a38d1cef73eda8ab765dc168284b9c113c237a8e/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql#L50
SELECT COUNT(*) OVER () FROM tenk1 WHERE unique2 < 10
I did some digging and found the table definition is https://github.com/apache/spark/blob/a38d1cef73eda8ab765dc168284b9c113c237a8e/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L536-L562
session
.read
.format("csv")
.options(Map("delimiter" -> "\t", "header" -> "false"))
.schema(
"""
|unique1 int,
|unique2 int,
|two int,
|four int,
|ten int,
|twenty int,
|hundred int,
|thousand int,
|twothousand int,
|fivethous int,
|tenthous int,
|odd int,
|even int,
|stringu1 string,
|stringu2 string,
|string4 string
""".stripMargin)
.load(testFile("test-data/postgresql/onek.data"))
The data is here: https://github.com/apache/spark/blob/a38d1cef73eda8ab765dc168284b9c113c237a8e/sql/core/src/test/resources/test-data/postgresql/tenk.data
@andygrove how can we test this with Comet? Can I just pin to a datafusion version?
@andygrove how can we test this with Comet? Can I just pin to a datafusion version?
Yes, assuming that there are no breaking API changes in DataFusion since 48 ... I will take a look this evening
@andygrove how can we test this with Comet? Can I just pin to a datafusion version?
Yes, assuming that there are no breaking API changes in DataFusion since 48 ... I will take a look this evening
Unfortunately, there are multiple breaking API changes (and some are non-trivial to resolve), so we can't easily test this PR from Comet.
@andygrove how can we test this with Comet? Can I just pin to a datafusion version?
Yes, assuming that there are no breaking API changes in DataFusion since 48 ... I will take a look this evening
Unfortunately, there are multiple breaking API changes (and some are non-trivial to resolve), so we can't easily test this PR from Comet.
I can base this commit on 48 and test it in comet.
https://github.com/apache/datafusion-comet/pull/1913 @andygrove CI seems to have passed.🎉
Thank you very much @suibianwanwank -- this looks great to me
I took the liberty of merging up from main and adding a link to the github issue in the test but I think we'll be ready to merge after that
Thanks, LGTM!
apache/datafusion-comet#1913 @andygrove CI seems to have passed.🎉
Yes, I confirmed that the test passes now:
2025-06-19T18:36:27.5009047Z [info] - postgreSQL/window_part1.sql (12 seconds, 530 milliseconds)
Thanks, @suibianwanwank, for backporting your PR to DF 48 so that we could verify the fix in Comet!
Thanks again for the diligence and follow up here