risingwave
risingwave copied to clipboard
RFC: rewrite distinct aggregation to join(not right now)
RFC: rewrite distinct aggregation when planning
the method will be nice only when there is better support for the DAG plan. so this is only a short description now and I will expand this design when a DAG plan is friendly in our optimizer and execution.
Background
we need support Distinct Aggregation in SQL queries like
select count(v1), sum(v2), count(distinct v1), count(distinct v2), sum(distinct v2) from t group by id;
and we can rewrite it to(updating: the rewriten SQL is wrong, please see https://github.com/singularity-data/risingwave/issues/2915#issuecomment-1141623924
select t0.agg_1, t0,agg_2, t1.agg_1, t2.agg_1, t2.agg_2 from
(select id, count(v1) as agg_1, sum(v2) as agg_2 from t group by id) t0
join
(select id, count(*) as agg_1 from t group by id, v1) t1
join
(select id, count(v2) as agg_1, sum(v2) as agg_2 from t group by id, v2) t2
on t0.id = t1.id and t1.id = t2.id
but we can see it will be expensive if we do not support the DAG plan because the input of the Agg executor will be executed multiple times after the rewrite. Let's re-think this design in the future.
Design
Future Optimizations
Discussions
Q&A
How do you want to support DAG in optimizer and execution?
How do you want to support DAG in optimizer and execution?
for the streaming part we should change in our frontend and the batch part is difficult
Subplan reuse in batch is different from streaming since the scheduling order is different. Also subplan reuse in the optimizer is also interesting. A trivial approach is to find common subplan after physical plan is determined by visiting the plan tree.
The rewrite above seems incorrect. Maybe this?
select t0.agg_1, t0.agg_2, t1.agg_1, t2.agg_1, t2.agg_2 from
(select id, count(v1) as agg_1, sum(v2) as agg_2 from t group by id) t0
join
(select id, count(v1) as agg_1 from (select distinct id, v1 from t) t1d group by id) t1
on t0.id = t1.id
join
(select id, count(v2) as agg_1, sum(v2) as agg_2 from (select distinct id, v2 from t) t2d group by id) t2
on t1.id = t2.id
(The movement of on-condition is just syntactical. The core difference is the derived queries t1 and t2.)
Because executing on DAG is not that simple to handle, we can first convert aggregates with one distinct aggregate and one or more non-distinct aggregates to multi-phase aggregates, which doesn't introduce DAG. Calcite has detailed comments on how to implement it. I think this is common case, so we can support this feature first.
+1 for this.
related #146
Here join is too expensive and is not necessary. Actually we only need to combine results in the same epoch.
... And multiple select (which may result in multiple TableScan) is expensive. As a common practice, we may introduce an Expand operator to do this.
Here join is too expensive and is not necessary. Actually we only need to combine results in the same epoch.
I am afraid not. Consider the SQL select count(distinct v1), sum(v2); if a record delete pass the 2 Agg operators of the 2 side. the sum(v2)'s agg operator might generate a update and count(distinct v1) will produce nothing( because there might be multiple rows with the same v1 value and it just deletes one record)
multiple select (which may result in multiple TableScan) is expensive
+1, the SQL here just represents the plan rewriting, and it will actually generate a DAG plan which will not produce multiple TableScan.
Because executing on DAG is not that simple to handle, we can first convert an aggregate with one distinct aggregate and one or more non-distinct aggregates to multi-phase aggregates, which doesn't introduce DAG. Calcite has detailed comments on how to implement it. I think this is common case, so we can support this feature first.
I will implement it.
- [ ] the aggregate query with agg call only one kind of distinct group like
select max(distinct v1), sum(distinct v1) - [ ] the aggregate query with agg call only one kind of distinct group and ordinary agg call like
select max(distinct v1), sum(distinct v1), max(v2), min(v3)
I am afraid not. Consider the SQL select count(distinct v1), sum(v2); if a record delete pass the 2 Agg operators of the 2 side. the sum(v2)'s agg operator might generate a update and count(distinct v1) will produce nothing( because there might be multiple rows with the same v1 value and it just deletes one record)
Theoretically, it's possible to handle that. We need something to represent "not changed" of a column (i.e. an aggregated value).
Because executing on DAG is not that simple to handle, we can first convert an aggregate with one distinct aggregate and one or more non-distinct aggregates to multi-phase aggregates, which doesn't introduce DAG. Calcite has detailed comments on how to implement it. I think this is common case, so we can support this feature first.
2-phase aggregate is feasible, but it's more expensive in streaming. For example:
select page, count(*), count(distinct user)
from t
group by page
IIUC, in the 2-phase approach,
1st aggregation: [ page, user ] -> [ count(*) as local_count ]
2nd aggregation: [ page ] -> [ sum(local_count), count(user) ]
Both aggregations are stateful (so they are expensive), meanwhile,
- It's actually useless to maintain the state of
[page, user] -> [count(*)]in 1st aggregation - It's also wasty to maintain the state of
[ page ] -> [ sum(local_count) ]because the change ofsum(local_count)can be directly calculated from the local state.
2-phase aggregate is feasible, but it's more expensive in streaming
I see
- I think there might be some state sharing method for rollup agg on stream and it is another topic.
- It still is a bad idea even if have some method to share the state and save the 2nd aggregation's state. using the example above.
select page, count(*), count(distinct user)
from t
group by page
when rewriting with the join, the total agg group number is count(dinst user)+count(dinst page). but when rewriting it to roll-up agg, the total agg group number is count(dinst user)*count(dinst page)
distinct has been supported via expand: #3589