risingwave icon indicating copy to clipboard operation
risingwave copied to clipboard

RFC: rewrite distinct aggregation to join(not right now)

Open st1page opened this issue 3 years ago • 14 comments

RFC: rewrite distinct aggregation when planning

the method will be nice only when there is better support for the DAG plan. so this is only a short description now and I will expand this design when a DAG plan is friendly in our optimizer and execution.

Background

we need support Distinct Aggregation in SQL queries like select count(v1), sum(v2), count(distinct v1), count(distinct v2), sum(distinct v2) from t group by id; and we can rewrite it to(updating: the rewriten SQL is wrong, please see https://github.com/singularity-data/risingwave/issues/2915#issuecomment-1141623924

select t0.agg_1, t0,agg_2, t1.agg_1, t2.agg_1, t2.agg_2 from 
    (select id, count(v1) as agg_1, sum(v2) as agg_2 from t group by id) t0
join 
    (select id, count(*) as agg_1 from t group by id, v1) t1
join 
    (select id, count(v2) as agg_1, sum(v2) as agg_2 from t group by id, v2) t2
on t0.id = t1.id and t1.id = t2.id

but we can see it will be expensive if we do not support the DAG plan because the input of the Agg executor will be executed multiple times after the rewrite. Let's re-think this design in the future.

Design

Future Optimizations

Discussions

Q&A

st1page avatar May 30 '22 16:05 st1page

How do you want to support DAG in optimizer and execution?

likg227 avatar May 31 '22 00:05 likg227

How do you want to support DAG in optimizer and execution?

for the streaming part we should change in our frontend and the batch part is difficult

st1page avatar May 31 '22 02:05 st1page

Subplan reuse in batch is different from streaming since the scheduling order is different. Also subplan reuse in the optimizer is also interesting. A trivial approach is to find common subplan after physical plan is determined by visiting the plan tree.

liurenjie1024 avatar May 31 '22 02:05 liurenjie1024

The rewrite above seems incorrect. Maybe this?

select t0.agg_1, t0.agg_2, t1.agg_1, t2.agg_1, t2.agg_2 from 
    (select id, count(v1) as agg_1, sum(v2) as agg_2 from t group by id) t0
join 
    (select id, count(v1) as agg_1 from (select distinct id, v1 from t) t1d group by id) t1
on t0.id = t1.id
join 
    (select id, count(v2) as agg_1, sum(v2) as agg_2 from (select distinct id, v2 from t) t2d group by id) t2
on t1.id = t2.id

(The movement of on-condition is just syntactical. The core difference is the derived queries t1 and t2.)

xiangjinwu avatar May 31 '22 03:05 xiangjinwu

Because executing on DAG is not that simple to handle, we can first convert aggregates with one distinct aggregate and one or more non-distinct aggregates to multi-phase aggregates, which doesn't introduce DAG. Calcite has detailed comments on how to implement it. I think this is common case, so we can support this feature first.

likg227 avatar Jun 01 '22 03:06 likg227

+1 for this.

liurenjie1024 avatar Jun 01 '22 04:06 liurenjie1024

related #146

fuyufjh avatar Jun 06 '22 09:06 fuyufjh

Here join is too expensive and is not necessary. Actually we only need to combine results in the same epoch.

fuyufjh avatar Jun 06 '22 09:06 fuyufjh

... And multiple select (which may result in multiple TableScan) is expensive. As a common practice, we may introduce an Expand operator to do this.

fuyufjh avatar Jun 06 '22 09:06 fuyufjh

Here join is too expensive and is not necessary. Actually we only need to combine results in the same epoch.

I am afraid not. Consider the SQL select count(distinct v1), sum(v2); if a record delete pass the 2 Agg operators of the 2 side. the sum(v2)'s agg operator might generate a update and count(distinct v1) will produce nothing( because there might be multiple rows with the same v1 value and it just deletes one record)

multiple select (which may result in multiple TableScan) is expensive

+1, the SQL here just represents the plan rewriting, and it will actually generate a DAG plan which will not produce multiple TableScan.

st1page avatar Jun 06 '22 18:06 st1page

Because executing on DAG is not that simple to handle, we can first convert an aggregate with one distinct aggregate and one or more non-distinct aggregates to multi-phase aggregates, which doesn't introduce DAG. Calcite has detailed comments on how to implement it. I think this is common case, so we can support this feature first.

I will implement it.

  • [ ] the aggregate query with agg call only one kind of distinct group like select max(distinct v1), sum(distinct v1)
  • [ ] the aggregate query with agg call only one kind of distinct group and ordinary agg call like select max(distinct v1), sum(distinct v1), max(v2), min(v3)

st1page avatar Jun 06 '22 18:06 st1page

I am afraid not. Consider the SQL select count(distinct v1), sum(v2); if a record delete pass the 2 Agg operators of the 2 side. the sum(v2)'s agg operator might generate a update and count(distinct v1) will produce nothing( because there might be multiple rows with the same v1 value and it just deletes one record)

Theoretically, it's possible to handle that. We need something to represent "not changed" of a column (i.e. an aggregated value).

fuyufjh avatar Jun 07 '22 05:06 fuyufjh

Because executing on DAG is not that simple to handle, we can first convert an aggregate with one distinct aggregate and one or more non-distinct aggregates to multi-phase aggregates, which doesn't introduce DAG. Calcite has detailed comments on how to implement it. I think this is common case, so we can support this feature first.

2-phase aggregate is feasible, but it's more expensive in streaming. For example:

select page, count(*), count(distinct user)
from t
group by page

IIUC, in the 2-phase approach,

1st aggregation: [ page, user ] -> [ count(*) as local_count ]
2nd aggregation: [ page ] -> [ sum(local_count), count(user) ]

Both aggregations are stateful (so they are expensive), meanwhile,

  • It's actually useless to maintain the state of [page, user] -> [count(*)] in 1st aggregation
  • It's also wasty to maintain the state of [ page ] -> [ sum(local_count) ] because the change of sum(local_count) can be directly calculated from the local state.

fuyufjh avatar Jun 07 '22 05:06 fuyufjh

2-phase aggregate is feasible, but it's more expensive in streaming

I see

  1. I think there might be some state sharing method for rollup agg on stream and it is another topic.
  2. It still is a bad idea even if have some method to share the state and save the 2nd aggregation's state. using the example above.
select page, count(*), count(distinct user)
from t
group by page

when rewriting with the join, the total agg group number is count(dinst user)+count(dinst page). but when rewriting it to roll-up agg, the total agg group number is count(dinst user)*count(dinst page)

st1page avatar Jun 07 '22 12:06 st1page

distinct has been supported via expand: #3589

xiangjinwu avatar Aug 27 '22 09:08 xiangjinwu