[multistage] [debugability] Support explain distributed stage plan and add a test for explicit distinct
Extend the current explain query visitor to explain distributed stage plan. Add a test for explicit distinct query.
@agavra
Is the goal here to support something like the following examples which are from TPCDS queries on Presto ?
LOGICAL PLAN
`Output[columnNames = [cs_sold_Date_sk, cs_bill_customer_sk, _col2]]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│ Estimates: {rows: 100 (2.64kB), cpu: 0, memory: 0B, network: 0B}
│ cs_sold_Date_sk := cs_sold_date_sk
│ _col2 := avg
└─ Limit[count = 100]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│ Estimates: {rows: 100 (2.64kB), cpu: 2.64k, memory: 0B, network: 0B}
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│ Estimates: {rows: 100 (2.64kB), cpu: 0, memory: 0B, network: 0B}
└─ RemoteExchange[type = GATHER]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│ Estimates: {rows: 100 (2.64kB), cpu: 0, memory: 0B, network: 2.64kB}
└─ LimitPartial[count = 100]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│ Estimates: {rows: 100 (2.64kB), cpu: 2.64k, memory: 0B, network: 0B}
└─ Project[]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
└─ Window[partitionBy = [cs_sold_date_sk], orderBy = [cs_sold_date_sk ASC NULLS LAST], hash = [$hashvalue]]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, $hashvalue:bigint, avg:double]
│ avg := avg("cs_bill_customer_sk") RANGE UNBOUNDED_PRECEDING CURRENT_ROW
└─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["cs_sold_date_sk"]]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, $hashvalue:bigint]
│ Estimates: {rows: 1441548 (37.01MB), cpu: 37.01M, memory: 0B, network: 0B}
└─ RemoteExchange[type = REPARTITION, hashColumn = [$hashvalue_0]]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, $hashvalue_0:bigint]
│ Estimates: {rows: 1441548 (37.01MB), cpu: 37.01M, memory: 0B, network: 37.01MB}
└─ ScanProject[table = tpcds:catalog_sales:sf1.0]
DISTRIBUTED PLAN showing the stage boundaries. Consider Presto's fragment same as our stage boundaries separated by exchanges. This is still at the logical level (not at the physical operator level yet). I think we should show something along these lines (without the cost info obviously) to clearly demarcate a stage ? Can possibly also show the hosts/ nodes ?
Fragment 0 [SINGLE]
Output layout: [cs_sold_date_sk, cs_bill_customer_sk, avg]
Output partitioning: SINGLE []
Output[columnNames = [cs_sold_Date_sk, cs_bill_customer_sk, _col2]]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│ Estimates: {rows: 100 (2.64kB), cpu: 0, memory: 0B, network: 0B}
│ cs_sold_Date_sk := cs_sold_date_sk
│ _col2 := avg
└─ Limit[count = 100]
Fragment 1 [HASH]
Output layout: [cs_sold_date_sk, cs_bill_customer_sk, avg]
Output partitioning: SINGLE []
LimitPartial[count = 100]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
│ Estimates: {rows: 100 (2.64kB), cpu: 2.64k, memory: 0B, network: 0B}
└─ Project[]
│ Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, avg:double]
Fragment 2 [SOURCE]
Output layout: [cs_sold_date_sk, cs_bill_customer_sk, $hashvalue_1]
Output partitioning: HASH [cs_sold_date_sk][$hashvalue_1]
ScanProject[table = tpcds:catalog_sales:sf1.0]
Layout: [cs_sold_date_sk:bigint, cs_bill_customer_sk:bigint, $hashvalue_1:bigint]
Estimates: {rows: 1441548 (37.01MB), cpu: 24.64M, memory: 0B, network: 0B}/{rows: 1441548 (37.01MB), cpu: 37.01M, memory: 0B, network: 0B}
$hashvalue_1 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("cs_sold_date_sk"), 0))
cs_sold_date_sk := tpcds:cs_sold_date_sk
cs_bill_customer_sk := tpcds:cs_bill_customer_sk
Can you also show a sample output ? Just want to understand how the plan will look like with the changes here.