trino icon indicating copy to clipboard operation
trino copied to clipboard

Adaptive planning framework in FTE

Open gaurav8297 opened this issue 1 year ago • 5 comments
trafficstars

Description

Adaptive planning is part of FTE, wherein the engine can modify the plan at runtime based on exchange-level statistics. For example, reordering of join or mitigation of skewness. It will significantly impact cost and performance if the plan chosen by the static optimiser isn’t the best due to the underestimation of statistics or lack of statistics.

Included in PR:

  • Optimizer framework for Adaptive Planning.
  • Migrated the first rule for adaptive partitioning to the Adaptive Planning framework.

High Level Design

Available Runtime Stats (at exchange level)

  • Row count
  • The output data size for all partitions

RemoteSourceNode Stats Rule:

  • A new stats rule that takes estimated stats from source stages and updates the rowCount and averageRowSize based on runtime statistics if available.
  • This rule is needed during replanning for stats calculation.

Replanning Steps:

  • Replanning will get triggered whenever a stage is finished, and the FTE scheduler has run-time statistics available for that stage.
  • At replanning, the engine will first merge all SubPlans into a single PlanNode where RemoteSourceNode for stages that haven’t finished will get replaced with Remote Exchanges. On the other hand, RemoteSourceNode for finished stages will remain as it is in the plan.
  • Once we have a merged plan which contains all the unfinished parts, we will reoptimize it using a set of PlanOptimizers.
  • During re-optimization, it is possible that some new exchanges need to be added due to the change in partitioning strategy. For instance, if a rule changes the distribution type of the join from BROADCAST to PARTITIONED.
  • It is also possible that some remote exchanges are removed. For example, while changing the order of the join.
  • Ultimately, the planner will fragment the optimized PlanNode again and generate the SubPlans with new PlanFragmentIds.
  • Ultimately, the planner will fragment the optimized PlanNode again and generate the SubPlans with new PlanFragmentIds. The re-fragmentation will only happen if the old plan and the new plan have some differences. To check these differences, we rely on PlanOptimizer#optimizeAndReturnChanges API which also returns changes in plan ids.

Note: We do not change the fragment ids which have no changes and are not downstream of the changed plan nodes. This optimization is done to avoid unnecessary stage restarts due to speculative execution.

Explain Analyze:

  • In case, some adaptive optimizations get triggered, the output of EXPLAIN ANALYZE will show both the old and new plans.

An example of adaptive partitioning, where the partition count changes from 50 to 1000.

Fragment 7 [HASH]                                                                                                                        >
     CPU: 22.49m, Scheduled: 49.02m, Blocked 2.87h (Input: 39.99m, Output: 0.00ns), Input: 7499989709 rows (62.86GB); per task: avg.: 5999>
     Output layout: [count_5]                                                                                                             >
     Output partitioning: SINGLE []                                                                                                       >
     Input partition count: 1000                                                                                                          >
     Aggregate[type = PARTIAL]                                                                                                            >
     │   Layout: [count_5:bigint]                                                                                                         >
     │   CPU: 8.63s (0.35%), Scheduled: 18.02s (0.27%), Blocked: 0.00ns (0.00%), Output: 1000 rows (8.79kB)                               >
     │   Input avg.: 5999989.71 rows, Input std.dev.: 22.01%                                                                              >
     │   count_5 := count(*)                                                                                                              >
     └─ InnerJoin[criteria = ("orderkey" = "orderkey_0"), distribution = PARTITIONED]                                                     >
        │   Layout: []                                                                                                                    >
        │   CPU: 19.39m (47.06%), Scheduled: 41.60m (37.94%), Blocked: 1.57h (58.30%), Output: 5999989709 rows (0B)                       >
        │   Left (probe) Input avg.: 5999989.71 rows, Input std.dev.: 22.01%                                                              >
        │   Right (build) Input avg.: 1500000.00 rows, Input std.dev.: 0.08%                                                              >
        │   Distribution: PARTITIONED                                                                                                     >
        ├─ AdaptivePlan[]                                                                                                                 >
        │  │   Layout: [orderkey:bigint]                                                                                                  >
        │  ├─ [Initial Plan] RemoteSource[sourceFragmentIds = [3]]                                                                        >
        │  │      Layout: [orderkey:bigint]                                                                                               >
        │  │      CPU: 6.45m (15.67%), Scheduled: 16.97m (15.48%), Blocked: 2.53m (1.57%), Output: 5999989709 rows (50.29GB)              >
        │  │      Input avg.: 14999974.27 rows, Input std.dev.: 12.86%                                                                    >
        │  └─ [Current Plan] RemoteSource[sourceFragmentIds = [8]]                                                                        >
        │         Layout: [orderkey:bigint]                                                                                               >
        │         CPU: 49.21s (1.99%), Scheduled: 2.31m (2.11%), Blocked: 20.45m (12.67%), Output: 5999989709 rows (50.29GB)              >
        │         Input avg.: 5999989.71 rows, Input std.dev.: 22.01%                                                                     >
        └─ LocalExchange[partitioning = HASH, arguments = ["orderkey_0"]]                                                                 >
           │   Layout: [orderkey_0:bigint]                                                                                                >
           │   CPU: 57.05s (2.31%), Scheduled: 1.93m (1.76%), Blocked: 23.04m (14.28%), Output: 1500000000 rows (12.57GB)                 >
           │   Input avg.: 1500000.00 rows, Input std.dev.: 39.40%                                                                        >
           └─ AdaptivePlan[]                                                                                                              >
              │   Layout: [orderkey_0:bigint]                                                                                             >
              ├─ [Initial Plan] RemoteSource[sourceFragmentIds = [4]]                                                                     >
              │      Layout: [orderkey_0:bigint]                                                                                          >
              │      CPU: 2.14m (5.19%), Scheduled: 5.36m (4.89%), Blocked: 1.75m (1.08%), Output: 1500000000 rows (12.57GB)              >
              │      Input avg.: 7500000.00 rows, Input std.dev.: 17.35%                                                                  >
              └─ [Current Plan] RemoteSource[sourceFragmentIds = [9]]                                                                     >
                     Layout: [orderkey_0:bigint]                                                                                          >
                     CPU: 1.18m (2.87%), Scheduled: 2.72m (2.48%), Blocked: 19.54m (12.11%), Output: 1500000000 rows (12.57GB)            >
                     Input avg.: 1500000.00 rows, Input std.dev.: 39.40%

Fragment 8 [HASH]                                                                                                                        >
     CPU: 6.46m, Scheduled: 16.97m, Blocked 2.53m (Input: 2.53m, Output: 0.00ns), Input: 5999989709 rows (50.29GB); per task: avg.: 119999>
     Output layout: [orderkey]                                                                                                            >
     Output partitioning: HASH [orderkey]                                                                                                 >
     Output partition count: 1000                                                                                                         >
     RemoteSource[sourceFragmentIds = [3]]                                                                                                >
         Layout: [orderkey:bigint]                                                                                                        >
         CPU: 6.45m (15.67%), Scheduled: 16.97m (15.48%), Blocked: 2.53m (1.57%), Output: 5999989709 rows (50.29GB)                       >
         Input avg.: 14999974.27 rows, Input std.dev.: 12.86%                                                                             >
                                                                                         >
 Fragment 3 [SOURCE]                                                                                      >
     CPU: 8.12m, Scheduled: 30.37m, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 5999989709 rows (50.29GB); per task: avg.: 2542>
     Amount of input data processed by the workers for this stage might be skewed                                                         >
     Output layout: [orderkey]                                                                                                            >
     Output partitioning: HASH [orderkey]                                                                                                 >
     ScanFilter[table = hive:tpch_sf1000_orc:lineitem, dynamicFilters = {"orderkey" = #df_375}]                                           >
         Layout: [orderkey:bigint]                                                                                                        >
         Estimates: {rows: 5999989709 (50.29GB), cpu: 50.29G, memory: 0B, network: 0B}/{rows: 5999989709 (50.29GB), cpu: 50.29G, memory: 0>
         CPU: 8.12m (19.71%), Scheduled: 30.37m (27.70%), Blocked: 0.00ns (0.00%), Output: 5999989709 rows (50.29GB)                      >
         Input avg.: 2111185.68 rows, Input std.dev.: 22.98%                                                                              >
         orderkey := orderkey:bigint:REGULAR                                                                                              >
         Input: 5999989709 rows (50.29GB), Filtered: 0.00%, Physical input: 4.96GB, Physical input time: 21.81m                           >
         Dynamic filters:                                                                                                                 >
             - df_375, ALL, collection time=276.25ms

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required. ( ) Release notes are required. Please propose a release note for me. ( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

gaurav8297 avatar Jan 04 '24 08:01 gaurav8297

I put some comments but I am a bit lost on main logic

A bit more clearer after I read PR description (shoul have done that at the beginning I guess). Will get back to reading in a while.

losipiuk avatar Jan 16 '24 13:01 losipiuk

Taking a look at failed CI

gaurav8297 avatar Jan 29 '24 10:01 gaurav8297

@gaurav8297 Could you extract prefix commits as separate PRs? They should land much quicker

sopel39 avatar Jan 29 '24 10:01 sopel39

CI Issue: https://github.com/trinodb/trino/issues/16315

gaurav8297 avatar Jan 31 '24 10:01 gaurav8297

A bit more complex example of AdaptivePlanNode:

Query:

SELECT n1.* FROM nation n1
RIGHT JOIN
(SELECT n.nationkey FROM (SELECT * FROM lineitem WHERE suppkey BETWEEN 20 and 30) l LEFT JOIN nation n on l.suppkey = n.nationkey) n2 
ON n1.nationkey = n2.nationkey + 1
Trino version: <unknown>
Fragment 5 [HASH]
    Output layout: [nationkey, name, regionkey, comment]
    Output partitioning: SINGLE []
    Input partition count: 10
    Output[columnNames = [nationkey, name, regionkey, comment]]
    │   Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]
    └─ RightJoin[criteria = ("nationkey" = "expr"), distribution = PARTITIONED]
       │   Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]
       │   Distribution: PARTITIONED
       ├─ AdaptivePlan[]
       │  │   Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]
       │  ├─ [Initial Plan] RemoteSource[sourceFragmentIds = [1]]
       │  │      Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]
       │  └─ [Current Plan] RemoteSource[sourceFragmentIds = [6]]
       │         Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]
       └─ LocalExchange[partitioning = SINGLE]
          │   Layout: [expr:bigint]
          └─ AdaptivePlan[]
             │   Layout: [expr:bigint]
             ├─ [Initial Plan] RemoteExchange[partitionCount = 1, type = REPARTITION]
             │  │   Layout: [expr:bigint]
             │  └─ DynamicFilterSource[dynamicFilterAssignments = {expr -> #df_649}]
             │     │   Layout: [expr:bigint]
             │     └─ Project[]
             │        │   Layout: [expr:bigint]
             │        │   expr := ("nationkey_3" + BIGINT '1')
             │        └─ LeftJoin[criteria = ("suppkey" = "nationkey_3"), distribution = PARTITIONED]
             │           │   Layout: [nationkey_3:bigint]
             │           │   Distribution: PARTITIONED
             │           ├─ RemoteSource[sourceFragmentIds = [3]]
             │           │      Layout: [suppkey:bigint]
             │           └─ LocalExchange[partitioning = SINGLE]
             │              │   Layout: [nationkey_3:bigint]
             │              └─ RemoteSource[sourceFragmentIds = [4]]
             │                     Layout: [nationkey_3:bigint]
             └─ [Current Plan] RemoteSource[sourceFragmentIds = [7]]
                    Layout: [expr:bigint]

Fragment 6 [HASH]
    Output layout: [nationkey, name, regionkey, comment]
    Output partitioning: HASH [nationkey]
    Output partition count: 10
    Input partition count: 1
    RemoteSource[sourceFragmentIds = [1]]
        Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]

Fragment 1 [SOURCE]
    Output layout: [nationkey, name, regionkey, comment]
    Output partitioning: HASH [nationkey]
    Output partition count: 1
    ScanFilter[table = test_catalog:tiny:nation, dynamicFilters = {"nationkey" = #df_649}]
        Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]
        Estimates: {rows: 25 (2.67kB), cpu: 2.67k, memory: 0B, network: 0B}/{rows: 25 (2.67kB), cpu: 2.67k, memory: 0B, network: 0B}
        nationkey := tpch:nationkey
        regionkey := tpch:regionkey
        name := tpch:name
        comment := tpch:comment

Fragment 7 [HASH]
    Output layout: [expr]
    Output partitioning: HASH [expr]
    Output partition count: 10
    Input partition count: 10
    DynamicFilterSource[dynamicFilterAssignments = {expr -> #df_649}]
    │   Layout: [expr:bigint]
    └─ Project[]
       │   Layout: [expr:bigint]
       │   expr := ("nationkey_3" + BIGINT '1')
       └─ LeftJoin[criteria = ("suppkey" = "nationkey_3"), distribution = PARTITIONED]
          │   Layout: [nationkey_3:bigint]
          │   Distribution: PARTITIONED
          ├─ RemoteSource[sourceFragmentIds = [8]]
          │      Layout: [suppkey:bigint]
          └─ LocalExchange[partitioning = SINGLE]
             │   Layout: [nationkey_3:bigint]
             └─ RemoteSource[sourceFragmentIds = [9]]
                    Layout: [nationkey_3:bigint]

Fragment 8 [HASH]
    Output layout: [suppkey]
    Output partitioning: HASH [suppkey]
    Output partition count: 10
    Input partition count: 1
    RemoteSource[sourceFragmentIds = [3]]
        Layout: [suppkey:bigint]

Fragment 3 [test_catalog:orders:15000]
    Output layout: [suppkey]
    Output partitioning: HASH [suppkey]
    Output partition count: 1
    ScanFilter[table = test_catalog:tiny:lineitem, filterPredicate = ("suppkey" BETWEEN BIGINT '20' AND BIGINT '30')]
        Layout: [suppkey:bigint]
        Estimates: {rows: 60175 (528.88kB), cpu: 528.88k, memory: 0B, network: 0B}/{rows: 6078 (53.42kB), cpu: 528.88k, memory: 0B, network: 0B}
        suppkey := tpch:suppkey

Fragment 9 [HASH]
    Output layout: [nationkey_3]
    Output partitioning: HASH [nationkey_3]
    Output partition count: 10
    Input partition count: 1
    RemoteSource[sourceFragmentIds = [4]]
        Layout: [nationkey_3:bigint]

Fragment 4 [SOURCE]
    Output layout: [nationkey_3]
    Output partitioning: HASH [nationkey_3]
    Output partition count: 1
    ScanFilter[table = test_catalog:tiny:nation, filterPredicate = ("nationkey_3" BETWEEN BIGINT '20' AND BIGINT '30')]
        Layout: [nationkey_3:bigint]
        Estimates: {rows: 25 (225B), cpu: 225, memory: 0B, network: 0B}/{rows: 4 (37B), cpu: 225, memory: 0B, network: 0B}
        nationkey_3 := tpch:nationkey

Due to AdaptivePartitioning rule, the partition count changed from 1 to 10. Take a look at Fragment 5 AdaptivePlanNode

gaurav8297 avatar Feb 01 '24 09:02 gaurav8297

Please do initial rebase and provide changes as fixups to make followup review easier.

losipiuk avatar Feb 26 '24 12:02 losipiuk

LGTM. Thanks

losipiuk avatar Mar 12 '24 11:03 losipiuk

Does this need a release note? Or is it non-user impacting at current moment due to just being a framework?

colebow avatar Mar 13 '24 21:03 colebow

It does not i think- it refactors a feature which was handcoded to use the new framework added by this PR - but from user perspective it is mostly not visible.

losipiuk avatar Mar 14 '24 13:03 losipiuk