datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Avoid inlining non deterministic CTE

Open tgujar opened this issue 1 year ago • 4 comments

Describe the bug

Currently Datafusion will inline all CTE, a non-deterministic expression can be executed multiple times producing different results

To Reproduce

Consider the following query which uses the aggregate_test_100 data from datafusion-examples. Here, column c11 is a Float64

WITH cte as (
    SELECT sum(c4 * c11) as total 
    FROM aggregate_test_100 
    GROUP BY c1) 
SELECT total 
FROM cte 
WHERE total = (select max(total) from cte)

The optimized plan generated will inline the CTE and thus execute it twice

Projection: cte.total
  Inner Join: cte.total = __scalar_sq_1.MAX(cte.total)
    SubqueryAlias: cte
      Projection: SUM(aggregate_test_100.c4 * aggregate_test_100.c11) AS total
        Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[SUM(CAST(aggregate_test_100.c4 AS Float64) * aggregate_test_100.c11)]]
          TableScan: aggregate_test_100 projection=[c1, c4, c11]
    SubqueryAlias: __scalar_sq_1
      Aggregate: groupBy=[[]], aggr=[[MAX(cte.total)]]
        SubqueryAlias: cte
          Projection: SUM(aggregate_test_100.c4 * aggregate_test_100.c11) AS total
            Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[SUM(CAST(aggregate_test_100.c4 AS Float64) * aggregate_test_100.c11)]]
              TableScan: aggregate_test_100 projection=[c1, c4, c11]

Expected behavior

Since summation here is dependent on ordering, I believe it is incorrect to inline the CTE here and execute it more than once.

Additional context

Related issue, which talks about possible advantages on not inlining CTE in some cases: https://github.com/apache/datafusion/issues/8777

tgujar avatar May 01 '24 22:05 tgujar

I can work on this if we can confirm in this is indeed a correctly reported bug. Let me know what you think, thanks!

tgujar avatar May 03 '24 13:05 tgujar

I worry that judging whether a query is non-deterministic may be not easy. Perhaps we can first leave this judgment to the user, only do this when the user specifies Materialized and there are multiple references.

DuckDB may have taken a similar approach.

v0.10.3-dev779 d26007417b

D with cte as (select random()) select * from cte union select * from cte;
┌────────────────────┐
│      random()      │
│       double       │
├────────────────────┤
│ 0.9430218460038304 │
│ 0.3114725165069103 │
└────────────────────┘
D with cte as materialized (select random()) select * from cte union select * from cte;
┌─────────────────────┐
│      random()       │
│       double        │
├─────────────────────┤
│ 0.13616445031948388 │
└─────────────────────┘

jonahgao avatar May 06 '24 15:05 jonahgao

I think this would push the responsibility to the user to figure out what may be non-deterministic. I am not sure if this would be a good approach

tgujar avatar May 07 '24 15:05 tgujar

Due to parallel execution, non-deterministic behavior occurs more frequently than in traditional databases. I'm not sure if it's appropriate to disable inline CTE for all non-deterministic queries, as inline has advantages in some scenarios.

Additionally, I believe that detecting non-deterministic behavior is more difficult than in traditional databases, as it is affected by whether the input is multi-partitioned, the configuration of target_partitions, whether a Repartition node has been added, and some queries, like MIN, may be immune to unordered execution, etc.

So I think it might be a good starting point to have the user specify explicitly. When users want to avoid non-deterministic behavior or recomputation, they can explicitly request no inlining.

jonahgao avatar May 08 '24 01:05 jonahgao