risinglight icon indicating copy to clipboard operation
risinglight copied to clipboard

feat: support intra-operator parallelism

Open wangrunji0408 opened this issue 1 year ago • 4 comments

This PR adds data partitioning and intra-operator parallelism.

graphviz-4

The performance of TPC-H improved on my M1 Pro (10 cores):

speedup

Seems resolve #748

wangrunji0408 avatar Nov 24 '24 13:11 wangrunji0408

two quick questions: what is the schema plan node? and what is the definition of exchange node? is it the distribution of the child, or the expected distribution of the output node?

skyzh avatar Nov 27 '24 05:11 skyzh

what is the schema plan node?

The schema node is a virtual node that only changes the output schema of the child node. It was introduced to resolve a tricky issue in 2-phase aggregation.

Let's say we have a query: select sum(a) * 2 from t;

The original plan is:

Proj: sum(a) * 2
    Agg: sum(a)
        Scan: t(a)

After parallelization (by pushing down the ToParallel node), the Agg is transformed into a 2-phase aggregation:

Proj: sum(a) * 2
    Agg: sum(sum(a))
        Exchange: merge
            Agg: sum(a)
                Scan: t(a)

You may notice that the output schema of the Agg node is changed from sum(a) to sum(sum(a)). Therefore, the Proj node will throw an error when trying to resolve the physical column index of its expression sum(a).

So, in order to keep the schema unchanged, we can insert a Schema node between Proj and Agg:

Proj: sum(a) * 2
    Schema: sum(a)
        Agg: sum(sum(a))
            Exchange: merge
                Agg: sum(a)
                    Scan: t(a)

And the Schema node will be simply ignored when building executors.

wangrunji0408 avatar Nov 27 '24 14:11 wangrunji0408

what is the definition of exchange node? is it the distribution of the child, or the expected distribution of the output node?

(exchange dist child) where dist is the expected distribution of the output. The child can have any distribution.

wangrunji0408 avatar Nov 27 '24 14:11 wangrunji0408

By the way, after this optimization, the bottleneck of some queries (such as Q6) has shifted to table scan. Next step it's critical to support parallel partition scan in the storage. 🥹

wangrunji0408 avatar Nov 27 '24 14:11 wangrunji0408