trino icon indicating copy to clipboard operation
trino copied to clipboard

Switch between distinct aggregation, mark distinct operator and OptimizeMixedDistinctAggregations automatically

Open sopel39 opened this issue 2 years ago • 3 comments

Decide automatically (perhaps using CBO) which way of computing distinct aggregation to use. Potential rules:

  • use mark distinct when number of distinct aggregations is low (e.g. 1 or 2). Using of mark distinct increases data shuffle across cluster.
  • use OptimizeMixedDistinctAggregations when number of distinct aggregations is high. OptimizeMixedDistinctAggregations will limit number of intermediate stages and also allows for partial aggregation to be evaluated.

sopel39 avatar Sep 20 '22 12:09 sopel39

cc @lukasz-stec

sopel39 avatar Sep 20 '22 12:09 sopel39

also allows for partial aggregation to be evaluated.

I believe partial aggregation supports MarkDistinct too

lukasz-stec avatar Sep 21 '22 08:09 lukasz-stec

I believe partial aggregation supports MarkDistinct too

It does, but the problem is when you have multiple distinct aggregations, e.g: aggr(distinct a), aggr(distinct b), ... Then for each you have to create MarkDistinctNode shuffling the data across cluster

sopel39 avatar Sep 21 '22 08:09 sopel39

Below are the different plans generated when MarkDistinct is enabled vs disabled.

presto:tpch> explain select l_suppkey, count(distinct l_discount), count(distinct l_quantity) from lineitem group by l_suppkey;
                                                                                                                                                  Query Plan                                                                 >
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 - Output[l_suppkey, _col1, _col2] => [l_suppkey:bigint, count:bigint, count_3:bigint]                                                                                                                                       >
         _col1 := count (1:27)                                                                                                                                                                                               >
         _col2 := count_3 (1:55)                                                                                                                                                                                             >
     - RemoteStreamingExchange[GATHER] => [l_suppkey:bigint, count_3:bigint, count:bigint]                                                                                                                                   >
         - Aggregate(FINAL)[l_suppkey] => [l_suppkey:bigint, count_3:bigint, count:bigint]                                                                                                                                   >
                 count_3 := "presto.default.count"((count_15)) (1:55)                                                                                                                                                        >
                 count := "presto.default.count"((count_16)) (1:27)                                                                                                                                                          >
             - LocalExchange[HASH] (l_suppkey) => [l_suppkey:bigint, count_15:bigint, count_16:bigint]                                                                                                                       >
                 - RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, count_15:bigint, count_16:bigint]                                                                                                              >
                     - Aggregate(PARTIAL)[l_suppkey] => [l_suppkey:bigint, count_15:bigint, count_16:bigint]                                                                                                                 >
                             count_15 := "presto.default.count"((l_quantity)) (mask = l_quantity$distinct) (1:55)                                                                                                            >
                             count_16 := "presto.default.count"((l_discount)) (mask = l_discount$distinct) (1:27)                                                                                                            >
                         - MarkDistinct[distinct=l_suppkey:bigint, l_quantity:double marker=l_quantity$distinct] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean, l_quantity$distinct>
                             - LocalExchange[HASH] (l_suppkey, l_quantity) => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean]                                                          >
                                 - RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean]                                                             >
                                     - MarkDistinct[distinct=l_suppkey:bigint, l_discount:double marker=l_discount$distinct] => [l_suppkey:bigint, l_quantity:double, l_discount:double, l_discount$distinct:boolean]        >
                                         - LocalExchange[HASH] (l_suppkey, l_discount) => [l_suppkey:bigint, l_quantity:double, l_discount:double]                                                                           >
                                                 Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}                                                                                                                  >
                                             - RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double]                                                                              >
                                                     Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}                                                                                                              >
                                                 - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional>
                                                         Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}                                                                                                       >
                                                         LAYOUT: tpch.lineitem{}                                                                                                                                             >
                                                         l_suppkey := l_suppkey:bigint:2:REGULAR (1:87)                                                                                                                      >
                                                         l_discount := l_discount:double:6:REGULAR (1:87)                                                                                                                    >
                                                         l_quantity := l_quantity:double:4:REGULAR (1:87)                                                                                                                    >
                                                                                                                                                                                                                             >
(1 row)

WARNING: COUNT(DISTINCT xxx) can be a very expensive operation when the cardinality is high for xxx. In most scenarios, using approx_distinct instead would be enough

presto:tpch> set session use_mark_distinct = false;
SET SESSION
presto:tpch> explain select l_suppkey, count(distinct l_discount), count(distinct l_quantity) from lineitem group by l_suppkey;
                                                                                                                                    Query Plan                                                                               >
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 - Output[l_suppkey, _col1, _col2] => [l_suppkey:bigint, count:bigint, count_3:bigint]                                                                                                                                       >
         _col1 := count (1:27)                                                                                                                                                                                               >
         _col2 := count_3 (1:55)                                                                                                                                                                                             >
     - RemoteStreamingExchange[GATHER] => [l_suppkey:bigint, count:bigint, count_3:bigint]                                                                                                                                   >
         - Aggregate[l_suppkey] => [l_suppkey:bigint, count:bigint, count_3:bigint]                                                                                                                                          >
                 count := "presto.default.count"(DISTINCT (l_discount)) (1:27)                                                                                                                                               >
                 count_3 := "presto.default.count"(DISTINCT (l_quantity)) (1:55)                                                                                                                                             >
             - LocalExchange[HASH] (l_suppkey) => [l_suppkey:bigint, l_quantity:double, l_discount:double]                                                                                                                   >
                     Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}                                                                                                                                              >
                 - RemoteStreamingExchange[REPARTITION] => [l_suppkey:bigint, l_quantity:double, l_discount:double]                                                                                                          >
                         Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}                                                                                                                                          >
                     - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{}]'}] => [l_s>
                             Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}                                                                                                                                   >
                             LAYOUT: tpch.lineitem{}                                                                                                                                                                         >
                             l_suppkey := l_suppkey:bigint:2:REGULAR (1:87)                                                                                                                                                  >
                             l_discount := l_discount:double:6:REGULAR (1:87)                                                                                                                                                >
                             l_quantity := l_quantity:double:4:REGULAR (1:87)                                                                                                                                                >
                                                                                                                                                                                                                             >
(1 row)

WARNING: COUNT(DISTINCT xxx) can be a very expensive operation when the cardinality is high for xxx. In most scenarios, using approx_distinct instead would be enough

presto:tpch>

shiyu-bytedance avatar Nov 08 '22 19:11 shiyu-bytedance

Yup. This is what I meant

sopel39 avatar Nov 14 '22 10:11 sopel39

Might be related to https://github.com/trinodb/trino/issues/15106, cc @lukasz-stec

sopel39 avatar Jan 11 '23 12:01 sopel39

@lukasz-stec can we close it?

sopel39 avatar Mar 17 '23 10:03 sopel39

Yes, it's fixed by https://github.com/trinodb/trino/pull/15927

lukasz-stec avatar Mar 17 '23 12:03 lukasz-stec