[SPARK-46228][SQL] Insert window group limit node for limit outside of window
What changes were proposed in this pull request?
In ad hoc queries we often need to limit the number of rows returned, if query is limit outside of window and the calculation of top k of the window function only depends on the rows with rank <= k, similar to top-k, we can insert node WindowGroupLimit to reduce shuffle.
It supports following pattern:
SELECT (... (row_number|rank|dense_rank|sum|max...)()
OVER (
PARTITION BY ...
ORDER BY ... ) AS v)
LIMIT k
Limit k outside of window can be converted into top k for each window group, finally gobal limit k. The calculation of the top k values after sorting for each window group only needs rows with partial rank <= k, so we can safely discard rows with partial rank > k, anywhere.
WindowGroupLimit basically adds a per-window-group limit before and after the shuffle to reduce the input data of window processing .
More specifically, the before-shuffle per-window-group limit:
adds an extra local sort to determine window group boundaries
applies per-group limit to reduce the data size of shuffle, and all the downstream operators.
Whether support inferring WindowGroupLimit from Limit outside of Window. That is the calculation of top k of the window function only depends on the rows with rank <= k. Check if:
- Window frame: first lower/upper must be 'UnboundedPreceding'/'CurrentRow', secondly window orderSpec exists unfoldable one or all window expressions are
RowFrame. Because when orderSpec is foldable and window expressions isRangeFrame, aggregation calculation requires the use of all rows in the window group. - Window function: All window expressions should not have
SizeBasedWindowFunction, for exampleCumeDist,NTile,PercentRank. Because aggregation calculation ofSizeBasedWindowFunctionrequires the use of all rows in the window group. - The Limit could not be pushed down through Window. Because
LimitPushDownThroughWindowhave better performance thanWindowGroupLimit.
The effect is similar to #38799. This is beneficial, assuming the per-window-group data size is large. Otherwise, the method to determine window group boundaries is pure overhead. The config spark.sql.window.group.limit.threshold could avoid the overhead if the per-window-group data size is small enough.
Why are the changes needed?
Reduce the shuffle write to improve the performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
- UT
- manual test on TPC-DS TPC-DS data size: 10Gb. This improvement is valid for TPC-DS q67-changed
SELECT *
FROM
(SELECT
i_category,
i_class,
i_brand,
i_product_name,
d_year,
d_qoy,
d_moy,
s_store_id,
sumsales,
rank()
OVER (PARTITION BY i_category
ORDER BY sumsales DESC) rk
FROM
(SELECT
i_category,
i_class,
i_brand,
i_product_name,
d_year,
d_qoy,
d_moy,
s_store_id,
coalesce(ss_sales_price * ss_quantity, 0) sumsales
FROM store_sales, date_dim, store, item
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_store_sk = s_store_sk
AND d_month_seq BETWEEN 1200 AND 1200 + 11
) dw1) dw2
LIMIT 100
| TPC-DS Query | Before(Seconds) | After(Seconds) | Speedup(Percent) |
|---|---|---|---|
| q67-changed | 18.246 | 10.799 | 168.96% |
Was this patch authored or co-authored using generative AI tooling?
No.
@beliefer If you have time, can you help me take a look? cc @cloud-fan
Although I understood the mechanism, please add more detail in the PR description. So as other contributors could understand.
Although I understood the mechanism, please add more detail in the PR description. So as other contributors could understand.
Done
cc @cloud-fan
@zml1206 The PR description is not clear enough. Please modify
We can extract the limit value 2 (("a", 4, ""),("a", 4, "")) on window group a because window expression is RangeFrame and after sort, the sort values of the first row and the second row are the same, limit value 1 (("b", 1, "h")) on window group b in WindowGroupLimitExec.
You can change to
We infer the rank() on window group a because window expression is RangeFrame and after sort. WindowGroupLimitExec will select RankLimitIterator to select the range values. We infer the rownumber() on window group b. WindowGroupLimitExec will select SimpleLimitIterator to select the row values.
@zml1206 The PR description is not clear enough. Please modify
We can extract the limit value 2 (("a", 4, ""),("a", 4, "")) on window group a because window expression is RangeFrame and after sort, the sort values of the first row and the second row are the same, limit value 1 (("b", 1, "h")) on window group b in WindowGroupLimitExec.You can change toWe infer the rank() on window group a because window expression is RangeFrame and after sort. WindowGroupLimitExec will select RankLimitIterator to select the range values. We infer the rownumber() on window group b. WindowGroupLimitExec will select SimpleLimitIterator to select the row values.
The description is a bit ambiguous. I made some changes. Please see what you think about it.
I changed the description. Do you think it’s easier to understand this way? @cloud-fan
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!