trino
trino copied to clipboard
Switch between distinct aggregation, mark distinct operator and OptimizeMixedDistinctAggregations automatically
Decide automatically (perhaps using CBO) which way of computing distinct aggregation to use. Potential rules:
- use mark distinct when number of distinct aggregations is low (e.g. 1 or 2). Using of mark distinct increases data shuffle across cluster.
- use
OptimizeMixedDistinctAggregations
when number of distinct aggregations is high.OptimizeMixedDistinctAggregations
will limit number of intermediate stages and also allows for partial aggregation to be evaluated.
cc @lukasz-stec
also allows for partial aggregation to be evaluated.
I believe partial aggregation supports MarkDistinct
too
I believe partial aggregation supports MarkDistinct too
It does, but the problem is when you have multiple distinct aggregations, e.g: aggr(distinct a), aggr(distinct b), ..
. Then for each you have to create MarkDistinctNode
shuffling the data across cluster
Below are the different plans generated when MarkDistinct is enabled vs disabled.
presto:tpch> explain select l_suppkey, count(distinct l_discount), count(distinct l_quantity) from lineitem group by l_suppkey;
Query Plan >
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
- Output[l_suppkey, _col1, _col2] => [l_suppkey:bigint, count:bigint, count_3:bigint] >
_col1 := count (1:27) >
_col2 := count_3 (1:55) >
- RemoteStreamingExchange[GATHER] => [l_suppkey:bigint, count_3:bigint, count:bigint] >
- Aggregate(FINAL)[l_suppkey] => [l_suppkey:bigint, count_3:bigint, count:bigint] >
count_3 := "presto.default.count"((count_15)) (1:55) >
count := "presto.default.count"((count_16)) (1:27) >
- LocalExchange[HASH] (l_suppkey) => [l_suppkey:bigint, count_15:bigint, count_16:bigint] >
- RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, count_15:bigint, count_16:bigint] >
- Aggregate(PARTIAL)[l_suppkey] => [l_suppkey:bigint, count_15:bigint, count_16:bigint] >
count_15 := "presto.default.count"((l_quantity)) (mask = l_quantity$distinct) (1:55) >
count_16 := "presto.default.count"((l_discount)) (mask = l_discount$distinct) (1:27) >
- MarkDistinct[distinct=l_suppkey:bigint, l_quantity:double marker=l_quantity$distinct] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean, l_quantity$distinct>
- LocalExchange[HASH] (l_suppkey, l_quantity) => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean] >
- RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean] >
- MarkDistinct[distinct=l_suppkey:bigint, l_discount:double marker=l_discount$distinct] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean] >
- LocalExchange[HASH] (l_suppkey, l_discount) => [l_suppkey:bigint, l_quantity:double, l_discount:double] >
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?} >
- RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double] >
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?} >
- TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional>
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00} >
LAYOUT: tpch.lineitem{} >
l_suppkey := l_suppkey:bigint:2:REGULAR (1:87) >
l_discount := l_discount:double:6:REGULAR (1:87) >
l_quantity := l_quantity:double:4:REGULAR (1:87) >
>
(1 row)
WARNING: COUNT(DISTINCT xxx) can be a very expensive operation when the cardinality is high for xxx. In most scenarios, using approx_distinct instead would be enough
presto:tpch> set session use_mark_distinct = false;
SET SESSION
presto:tpch> explain select l_suppkey, count(distinct l_discount), count(distinct l_quantity) from lineitem group by l_suppkey;
Query Plan >
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
- Output[l_suppkey, _col1, _col2] => [l_suppkey:bigint, count:bigint, count_3:bigint] >
_col1 := count (1:27) >
_col2 := count_3 (1:55) >
- RemoteStreamingExchange[GATHER] => [l_suppkey:bigint, count:bigint, count_3:bigint] >
- Aggregate[l_suppkey] => [l_suppkey:bigint, count:bigint, count_3:bigint] >
count := "presto.default.count"(DISTINCT (l_discount)) (1:27) >
count_3 := "presto.default.count"(DISTINCT (l_quantity)) (1:55) >
- LocalExchange[HASH] (l_suppkey) => [l_suppkey:bigint, l_quantity:double, l_discount:double] >
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?} >
- RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double] >
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?} >
- TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{}]'}] => [l_s>
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00} >
LAYOUT: tpch.lineitem{} >
l_suppkey := l_suppkey:bigint:2:REGULAR (1:87) >
l_discount := l_discount:double:6:REGULAR (1:87) >
l_quantity := l_quantity:double:4:REGULAR (1:87) >
>
(1 row)
WARNING: COUNT(DISTINCT xxx) can be a very expensive operation when the cardinality is high for xxx. In most scenarios, using approx_distinct instead would be enough
presto:tpch>
Yup. This is what I meant
Might be related to https://github.com/trinodb/trino/issues/15106, cc @lukasz-stec
@lukasz-stec can we close it?
Yes, it's fixed by https://github.com/trinodb/trino/pull/15927