Switch between distinct aggregation, mark distinct operator and OptimizeMixedDistinctAggregations automatically
Decide automatically (perhaps using CBO) which way of computing distinct aggregation to use. Potential rules:
- use mark distinct when number of distinct aggregations is low (e.g. 1 or 2). Using of mark distinct increases data shuffle across cluster.
- use
OptimizeMixedDistinctAggregationswhen number of distinct aggregations is high.OptimizeMixedDistinctAggregationswill limit number of intermediate stages and also allows for partial aggregation to be evaluated.
cc @lukasz-stec
also allows for partial aggregation to be evaluated.
I believe partial aggregation supports MarkDistinct too
I believe partial aggregation supports MarkDistinct too
It does, but the problem is when you have multiple distinct aggregations, e.g: aggr(distinct a), aggr(distinct b), ... Then for each you have to create MarkDistinctNode shuffling the data across cluster
Below are the different plans generated when MarkDistinct is enabled vs disabled.
presto:tpch> explain select l_suppkey, count(distinct l_discount), count(distinct l_quantity) from lineitem group by l_suppkey;
Query Plan >
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
- Output[l_suppkey, _col1, _col2] => [l_suppkey:bigint, count:bigint, count_3:bigint] >
_col1 := count (1:27) >
_col2 := count_3 (1:55) >
- RemoteStreamingExchange[GATHER] => [l_suppkey:bigint, count_3:bigint, count:bigint] >
- Aggregate(FINAL)[l_suppkey] => [l_suppkey:bigint, count_3:bigint, count:bigint] >
count_3 := "presto.default.count"((count_15)) (1:55) >
count := "presto.default.count"((count_16)) (1:27) >
- LocalExchange[HASH] (l_suppkey) => [l_suppkey:bigint, count_15:bigint, count_16:bigint] >
- RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, count_15:bigint, count_16:bigint] >
- Aggregate(PARTIAL)[l_suppkey] => [l_suppkey:bigint, count_15:bigint, count_16:bigint] >
count_15 := "presto.default.count"((l_quantity)) (mask = l_quantity$distinct) (1:55) >
count_16 := "presto.default.count"((l_discount)) (mask = l_discount$distinct) (1:27) >
- MarkDistinct[distinct=l_suppkey:bigint, l_quantity:double marker=l_quantity$distinct] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean, l_quantity$distinct>
- LocalExchange[HASH] (l_suppkey, l_quantity) => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean] >
- RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean] >
- MarkDistinct[distinct=l_suppkey:bigint, l_discount:double marker=l_discount$distinct] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean] >
- LocalExchange[HASH] (l_suppkey, l_discount) => [l_suppkey:bigint, l_quantity:double, l_discount:double] >
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?} >
- RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double] >
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?} >
- TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional>
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00} >
LAYOUT: tpch.lineitem{} >
l_suppkey := l_suppkey:bigint:2:REGULAR (1:87) >
l_discount := l_discount:double:6:REGULAR (1:87) >
l_quantity := l_quantity:double:4:REGULAR (1:87) >
>
(1 row)
WARNING: COUNT(DISTINCT xxx) can be a very expensive operation when the cardinality is high for xxx. In most scenarios, using approx_distinct instead would be enough
presto:tpch> set session use_mark_distinct = false;
SET SESSION
presto:tpch> explain select l_suppkey, count(distinct l_discount), count(distinct l_quantity) from lineitem group by l_suppkey;
Query Plan >
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
- Output[l_suppkey, _col1, _col2] => [l_suppkey:bigint, count:bigint, count_3:bigint] >
_col1 := count (1:27) >
_col2 := count_3 (1:55) >
- RemoteStreamingExchange[GATHER] => [l_suppkey:bigint, count:bigint, count_3:bigint] >
- Aggregate[l_suppkey] => [l_suppkey:bigint, count:bigint, count_3:bigint] >
count := "presto.default.count"(DISTINCT (l_discount)) (1:27) >
count_3 := "presto.default.count"(DISTINCT (l_quantity)) (1:55) >
- LocalExchange[HASH] (l_suppkey) => [l_suppkey:bigint, l_quantity:double, l_discount:double] >
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?} >
- RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double] >
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?} >
- TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{}]'}] => [l_s>
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00} >
LAYOUT: tpch.lineitem{} >
l_suppkey := l_suppkey:bigint:2:REGULAR (1:87) >
l_discount := l_discount:double:6:REGULAR (1:87) >
l_quantity := l_quantity:double:4:REGULAR (1:87) >
>
(1 row)
WARNING: COUNT(DISTINCT xxx) can be a very expensive operation when the cardinality is high for xxx. In most scenarios, using approx_distinct instead would be enough
presto:tpch>
Yup. This is what I meant
Might be related to https://github.com/trinodb/trino/issues/15106, cc @lukasz-stec
@lukasz-stec can we close it?
Yes, it's fixed by https://github.com/trinodb/trino/pull/15927