datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Fix constant window for evaluate stateful

Open suibianwanwank opened this issue 6 months ago • 5 comments

Which issue does this PR close?

  • Closes #16308.

Rationale for this change

I'm not very familiar with Comet, and I'm unsure how to verify this issue. So I tried checking the code.

When both aggregate_evaluate_stateful and the optimization from https://github.com/apache/datafusion/pull/16234 are present, it leads to inconsistent return values compared to the original behavior. (However, this doesn't seem to occur in DataFusion's default planning.)

So based on this fix, I created a test, and on the main branch, this error is consistent with #16308:

attempt to subtract with overflow
stack backtrace:
   0: __rustc::rust_begin_unwind
             at /rustc/17067e9ac6d7ecb70e50f92c1944e545188d2359/library/std/src/panicking.rs:697:5
   1: core::panicking::panic_fmt
             at /rustc/17067e9ac6d7ecb70e50f92c1944e545188d2359/library/core/src/panicking.rs:75:14
   2: core::panicking::panic_const::panic_const_sub_overflow
             at /rustc/17067e9ac6d7ecb70e50f92c1944e545188d2359/library/core/src/panicking.rs:178:21
   3: datafusion_expr::window_state::WindowAggState::update
             at /home/suibianwanwan/code_repository/datafusion/datafusion/expr/src/window_state.rs:95:13
   4: datafusion_physical_expr::window::window_expr::AggregateWindowExpr::aggregate_evaluate_stateful
             at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-expr/src/window/window_expr.rs:259:13
   5: <datafusion_physical_expr::window::aggregate::PlainAggregateWindowExpr as datafusion_physical_expr::window::window_expr::WindowExpr>::evaluate_stateful
             at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-expr/src/window/aggregate.rs:149:9
   6: datafusion_physical_plan::windows::bounded_window_agg_exec::BoundedWindowAggStream::compute_aggregates
             at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:986:13
   7: datafusion_physical_plan::windows::bounded_window_agg_exec::BoundedWindowAggStream::poll_next_inner
             at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:1049:38
   8: <datafusion_physical_plan::windows::bounded_window_agg_exec::BoundedWindowAggStream as futures_core::stream::Stream>::poll_next
             at /home/suibianwanwan/code_repository/datafusion/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:952:20
   9: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at /home/suibianwanwan/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:130:9
  10: <S as futures_core::stream::TryStream>::try_poll_next
             at /home/suibianwanwan/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:206:9
  11: <futures_util::stream::try_stream::try_collect::TryCollect<St,C> as core::future::future::Future>::poll
             at /home/suibianwanwan/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/try_stream/try_collect.rs:46:26

suibianwanwank avatar Jun 17 '25 17:06 suibianwanwank

FYI, @alamb @andygrove.

I'm not fully familiar with testing beyond logicaltests in Datafusion. So if there's a better way to handle this, feel free to let me know~ Thanks!

suibianwanwank avatar Jun 17 '25 17:06 suibianwanwank

Thanks @suibianwanwank - I think it would be great if we could use .slt tests to write a reproducer

Here are the instructions: https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest

Ideally you should be able to extend one of the existing test files in https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest/test_files

If you can't find something suitable I can try and find time to help over the next few days

alamb avatar Jun 17 '25 22:06 alamb

If you can't find something suitable I can try and find time to help over the next few days

Thanks @alamb, as mentioned, the PhysicalPlan generated by default planning can't reproduce this issue, so I need to construct it manually.

suibianwanwank avatar Jun 18 '25 03:06 suibianwanwank

I tried making a reproducer but I could not reproduce the wrong results or panic reported in @andygrove 's comment https://github.com/apache/datafusion/issues/16308#issuecomment-2949516445:

Here is what I tried:

Data: tenk.csv

Repro

create external table tenk1
(
unique1 int,
unique2 int,
two int,
four int,
ten int,
twenty int,
hundred int,
thousand int,
twothousand int,
fivethous int,
tenthous int,
odd int,
even int,
stringu1 string,
stringu2 string,
string4 string
)
stored as CSV location 'tenk.csv'
OPTIONS('has_header' 'false','format.delimiter' 9);

SELECT * from tenk1 limit 10;

SELECT COUNT(*) OVER () FROM tenk1 WHERE unique2 < 10

But that seems to work just fine:

(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Downloads$ datafusion-cli -f repro.sql
DataFusion CLI v48.0.0
0 row(s) fetched.
Elapsed 0.001 seconds.

+---------+---------+-----+------+-----+--------+---------+----------+-------------+-----------+----------+-----+------+----------+----------+---------+
| unique1 | unique2 | two | four | ten | twenty | hundred | thousand | twothousand | fivethous | tenthous | odd | even | stringu1 | stringu2 | string4 |
+---------+---------+-----+------+-----+--------+---------+----------+-------------+-----------+----------+-----+------+----------+----------+---------+
| 8800    | 0       | 0   | 0    | 0   | 0      | 0       | 800      | 800         | 3800      | 8800     | 0   | 1    | MAAAAA   | AAAAAA   | AAAAxx  |
| 1891    | 1       | 1   | 3    | 1   | 11     | 91      | 891      | 1891        | 1891      | 1891     | 182 | 183  | TUAAAA   | BAAAAA   | HHHHxx  |
| 3420    | 2       | 0   | 0    | 0   | 0      | 20      | 420      | 1420        | 3420      | 3420     | 40  | 41   | OBAAAA   | CAAAAA   | OOOOxx  |
| 9850    | 3       | 0   | 2    | 0   | 10     | 50      | 850      | 1850        | 4850      | 9850     | 100 | 101  | WOAAAA   | DAAAAA   | VVVVxx  |
| 7164    | 4       | 0   | 0    | 4   | 4      | 64      | 164      | 1164        | 2164      | 7164     | 128 | 129  | OPAAAA   | EAAAAA   | AAAAxx  |
| 8009    | 5       | 1   | 1    | 9   | 9      | 9       | 9        | 9           | 3009      | 8009     | 18  | 19   | BWAAAA   | FAAAAA   | HHHHxx  |
| 5057    | 6       | 1   | 1    | 7   | 17     | 57      | 57       | 1057        | 57        | 5057     | 114 | 115  | NMAAAA   | GAAAAA   | OOOOxx  |
| 6701    | 7       | 1   | 1    | 1   | 1      | 1       | 701      | 701         | 1701      | 6701     | 2   | 3    | TXAAAA   | HAAAAA   | VVVVxx  |
| 4321    | 8       | 1   | 1    | 1   | 1      | 21      | 321      | 321         | 4321      | 4321     | 42  | 43   | FKAAAA   | IAAAAA   | AAAAxx  |
| 3043    | 9       | 1   | 3    | 3   | 3      | 43      | 43       | 1043        | 3043      | 3043     | 86  | 87   | BNAAAA   | JAAAAA   | HHHHxx  |
+---------+---------+-----+------+-----+--------+---------+----------+-------------+-----------+----------+-----+------+----------+----------+---------+
10 row(s) fetched.
Elapsed 0.007 seconds.

+-------------------------------------------------------------------+
| count(*) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING |
+-------------------------------------------------------------------+
| 10                                                                |
| 10                                                                |
| 10                                                                |
| 10                                                                |
| 10                                                                |
| 10                                                                |
| 10                                                                |
| 10                                                                |
| 10                                                                |
| 10                                                                |
+-------------------------------------------------------------------+
10 row(s) fetched.
Elapsed 0.004 seconds.

Notes for myself of where this came from:

https://github.com/apache/spark/blob/a38d1cef73eda8ab765dc168284b9c113c237a8e/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql#L50

SELECT COUNT(*) OVER () FROM tenk1 WHERE unique2 < 10

I did some digging and found the table definition is https://github.com/apache/spark/blob/a38d1cef73eda8ab765dc168284b9c113c237a8e/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L536-L562

    session
      .read
      .format("csv")
      .options(Map("delimiter" -> "\t", "header" -> "false"))
      .schema(
        """
          |unique1 int,
          |unique2 int,
          |two int,
          |four int,
          |ten int,
          |twenty int,
          |hundred int,
          |thousand int,
          |twothousand int,
          |fivethous int,
          |tenthous int,
          |odd int,
          |even int,
          |stringu1 string,
          |stringu2 string,
          |string4 string
        """.stripMargin)
      .load(testFile("test-data/postgresql/onek.data"))

The data is here: https://github.com/apache/spark/blob/a38d1cef73eda8ab765dc168284b9c113c237a8e/sql/core/src/test/resources/test-data/postgresql/tenk.data

alamb avatar Jun 18 '25 20:06 alamb

@andygrove how can we test this with Comet? Can I just pin to a datafusion version?

alamb avatar Jun 18 '25 20:06 alamb

@andygrove how can we test this with Comet? Can I just pin to a datafusion version?

Yes, assuming that there are no breaking API changes in DataFusion since 48 ... I will take a look this evening

andygrove avatar Jun 19 '25 00:06 andygrove

@andygrove how can we test this with Comet? Can I just pin to a datafusion version?

Yes, assuming that there are no breaking API changes in DataFusion since 48 ... I will take a look this evening

Unfortunately, there are multiple breaking API changes (and some are non-trivial to resolve), so we can't easily test this PR from Comet.

andygrove avatar Jun 19 '25 01:06 andygrove

@andygrove how can we test this with Comet? Can I just pin to a datafusion version?

Yes, assuming that there are no breaking API changes in DataFusion since 48 ... I will take a look this evening

Unfortunately, there are multiple breaking API changes (and some are non-trivial to resolve), so we can't easily test this PR from Comet.

I can base this commit on 48 and test it in comet.

suibianwanwank avatar Jun 19 '25 04:06 suibianwanwank

https://github.com/apache/datafusion-comet/pull/1913 @andygrove CI seems to have passed.🎉

suibianwanwank avatar Jun 20 '25 02:06 suibianwanwank

Thank you very much @suibianwanwank -- this looks great to me

I took the liberty of merging up from main and adding a link to the github issue in the test but I think we'll be ready to merge after that

Thanks, LGTM!

suibianwanwank avatar Jun 20 '25 13:06 suibianwanwank

apache/datafusion-comet#1913 @andygrove CI seems to have passed.🎉

Yes, I confirmed that the test passes now:

2025-06-19T18:36:27.5009047Z [info] - postgreSQL/window_part1.sql (12 seconds, 530 milliseconds)

Thanks, @suibianwanwank, for backporting your PR to DF 48 so that we could verify the fix in Comet!

andygrove avatar Jun 20 '25 14:06 andygrove

Thanks again for the diligence and follow up here

alamb avatar Jun 20 '25 19:06 alamb