pinot icon indicating copy to clipboard operation
pinot copied to clipboard

[multistage] [debugability] Support explain distributed stage plan and add a test for explicit distinct

Open 61yao opened this issue 3 years ago • 1 comments

Extend the current explain query visitor to explain distributed stage plan. Add a test for explicit distinct query.

61yao avatar Oct 20 '22 00:10 61yao

@agavra

61yao avatar Oct 20 '22 00:10 61yao

Is the goal here to support something like the following examples which are from TPCDS queries on Presto ?

LOGICAL PLAN

`Output[columnNames = [cs_sold_Date_sk, cs_bill_customer_sk, _col2]]
│   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│   Estimates: {rows: 100 (2.64kB), cpu: 0, memory: 0B, network: 0B}
│   cs_sold_Date_sk := cs_sold_date_sk
│   _col2 := avg
└─ Limit[count = 100]
   │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
   │   Estimates: {rows: 100 (2.64kB), cpu: 2.64k, memory: 0B, network: 0B}
   └─ LocalExchange[partitioning = SINGLE]
      │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
      │   Estimates: {rows: 100 (2.64kB), cpu: 0, memory: 0B, network: 0B}
      └─ RemoteExchange[type = GATHER]
         │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
         │   Estimates: {rows: 100 (2.64kB), cpu: 0, memory: 0B, network: 2.64kB}
         └─ LimitPartial[count = 100]
            │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
            │   Estimates: {rows: 100 (2.64kB), cpu: 2.64k, memory: 0B, network: 0B}
            └─ Project[]
               │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
               │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
               └─ Window[partitionBy = [cs_sold_date_sk], orderBy = [cs_sold_date_sk ASC NULLS LAST], hash = [$hashvalue]]
                  │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, $hashvalue:bigint, avg:double]
                  │   avg := avg("cs_bill_customer_sk") RANGE UNBOUNDED_PRECEDING CURRENT_ROW
                  └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["cs_sold_date_sk"]]
                     │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, $hashvalue:bigint]
                     │   Estimates: {rows: 1441548 (37.01MB), cpu: 37.01M, memory: 0B, network: 0B}
                     └─ RemoteExchange[type = REPARTITION, hashColumn = [$hashvalue_0]]
                        │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, $hashvalue_0:bigint]
                        │   Estimates: {rows: 1441548 (37.01MB), cpu: 37.01M, memory: 0B, network: 37.01MB}
                        └─ ScanProject[table = tpcds:catalog_sales:sf1.0]

DISTRIBUTED PLAN showing the stage boundaries. Consider Presto's fragment same as our stage boundaries separated by exchanges. This is still at the logical level (not at the physical operator level yet). I think we should show something along these lines (without the cost info obviously) to clearly demarcate a stage ? Can possibly also show the hosts/ nodes ?

Fragment 0 [SINGLE]
    Output layout: [cs_sold_date_sk, cs_bill_customer_sk, avg]
    Output partitioning: SINGLE []
    Output[columnNames = [cs_sold_Date_sk, cs_bill_customer_sk, _col2]]
    │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
    │   Estimates: {rows: 100 (2.64kB), cpu: 0, memory: 0B, network: 0B}
    │   cs_sold_Date_sk := cs_sold_date_sk
    │   _col2 := avg
    └─ Limit[count = 100]

Fragment 1 [HASH]
    Output layout: [cs_sold_date_sk, cs_bill_customer_sk, avg]
    Output partitioning: SINGLE []
    LimitPartial[count = 100]
    │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
    │   Estimates: {rows: 100 (2.64kB), cpu: 2.64k, memory: 0B, network: 0B}
    └─ Project[]
       │   Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]

Fragment 2 [SOURCE]
    Output layout: [cs_sold_date_sk, cs_bill_customer_sk, $hashvalue_1]
    Output partitioning: HASH [cs_sold_date_sk][$hashvalue_1]
    ScanProject[table = tpcds:catalog_sales:sf1.0]
        Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, $hashvalue_1:bigint]
        Estimates: {rows: 1441548 (37.01MB), cpu: 24.64M, memory: 0B, network: 0B}/{rows: 1441548 (37.01MB), cpu: 37.01M, memory: 0B, network: 0B}
        $hashvalue_1 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("cs_sold_date_sk"), 0))
        cs_sold_date_sk := tpcds:cs_sold_date_sk
        cs_bill_customer_sk := tpcds:cs_bill_customer_sk

siddharthteotia avatar Oct 27 '22 08:10 siddharthteotia

Can you also show a sample output ? Just want to understand how the plan will look like with the changes here.

siddharthteotia avatar Oct 27 '22 08:10 siddharthteotia