spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-46228][SQL] Insert window group limit node for limit outside of window

Open zml1206 opened this issue 2 years ago • 8 comments

What changes were proposed in this pull request?

In ad hoc queries we often need to limit the number of rows returned, if query is limit outside of window and the calculation of top k of the window function only depends on the rows with rank <= k, similar to top-k, we can insert node WindowGroupLimit to reduce shuffle.

It supports following pattern:

SELECT (... (row_number|rank|dense_rank|sum|max...)()
    OVER (
PARTITION BY ...
ORDER BY  ... ) AS v)
LIMIT k

Limit k outside of window can be converted into top k for each window group, finally gobal limit k. The calculation of the top k values ​​after sorting for each window group only needs rows with partial rank <= k, so we can safely discard rows with partial rank > k, anywhere. WindowGroupLimit basically adds a per-window-group limit before and after the shuffle to reduce the input data of window processing . More specifically, the before-shuffle per-window-group limit: adds an extra local sort to determine window group boundaries applies per-group limit to reduce the data size of shuffle, and all the downstream operators.

Whether support inferring WindowGroupLimit from Limit outside of Window. That is the calculation of top k of the window function only depends on the rows with rank <= k. Check if:

  1. Window frame: first lower/upper must be 'UnboundedPreceding'/'CurrentRow', secondly window orderSpec exists unfoldable one or all window expressions are RowFrame. Because when orderSpec is foldable and window expressions is RangeFrame, aggregation calculation requires the use of all rows in the window group.
  2. Window function: All window expressions should not have SizeBasedWindowFunction, for example CumeDist, NTile, PercentRank. Because aggregation calculation of SizeBasedWindowFunction requires the use of all rows in the window group.
  3. The Limit could not be pushed down through Window. Because LimitPushDownThroughWindow have better performance than WindowGroupLimit.

The effect is similar to #38799. This is beneficial, assuming the per-window-group data size is large. Otherwise, the method to determine window group boundaries is pure overhead. The config spark.sql.window.group.limit.threshold could avoid the overhead if the per-window-group data size is small enough.

Why are the changes needed?

Reduce the shuffle write to improve the performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  1. UT
  2. manual test on TPC-DS TPC-DS data size: 10Gb. This improvement is valid for TPC-DS q67-changed
SELECT *
FROM
  (SELECT
    i_category,
    i_class,
    i_brand,
    i_product_name,
    d_year,
    d_qoy,
    d_moy,
    s_store_id,
    sumsales,
    rank()
    OVER (PARTITION BY i_category
      ORDER BY sumsales DESC) rk
  FROM
    (SELECT
      i_category,
      i_class,
      i_brand,
      i_product_name,
      d_year,
      d_qoy,
      d_moy,
      s_store_id,
      coalesce(ss_sales_price * ss_quantity, 0) sumsales
    FROM store_sales, date_dim, store, item
    WHERE ss_sold_date_sk = d_date_sk
      AND ss_item_sk = i_item_sk
      AND ss_store_sk = s_store_sk
      AND d_month_seq BETWEEN 1200 AND 1200 + 11
    ) dw1) dw2
LIMIT 100
TPC-DS Query Before(Seconds) After(Seconds) Speedup(Percent)
q67-changed 18.246 10.799 168.96%

Was this patch authored or co-authored using generative AI tooling?

No.

zml1206 avatar Dec 04 '23 02:12 zml1206

@beliefer If you have time, can you help me take a look? cc @cloud-fan

zml1206 avatar Dec 05 '23 02:12 zml1206

Although I understood the mechanism, please add more detail in the PR description. So as other contributors could understand.

beliefer avatar Dec 06 '23 08:12 beliefer

Although I understood the mechanism, please add more detail in the PR description. So as other contributors could understand.

Done

zml1206 avatar Dec 06 '23 08:12 zml1206

cc @cloud-fan

beliefer avatar Dec 09 '23 04:12 beliefer

@zml1206 The PR description is not clear enough. Please modify We can extract the limit value 2 (("a", 4, ""),("a", 4, "")) on window group a because window expression is RangeFrame and after sort, the sort values ​​of the first row and the second row are the same, limit value 1 (("b", 1, "h")) on window group b in WindowGroupLimitExec. You can change to We infer the rank() on window group a because window expression is RangeFrame and after sort. WindowGroupLimitExec will select RankLimitIterator to select the range values. We infer the rownumber() on window group b. WindowGroupLimitExec will select SimpleLimitIterator to select the row values.

beliefer avatar Dec 09 '23 12:12 beliefer

@zml1206 The PR description is not clear enough. Please modify We can extract the limit value 2 (("a", 4, ""),("a", 4, "")) on window group a because window expression is RangeFrame and after sort, the sort values ​​of the first row and the second row are the same, limit value 1 (("b", 1, "h")) on window group b in WindowGroupLimitExec. You can change to We infer the rank() on window group a because window expression is RangeFrame and after sort. WindowGroupLimitExec will select RankLimitIterator to select the range values. We infer the rownumber() on window group b. WindowGroupLimitExec will select SimpleLimitIterator to select the row values.

The description is a bit ambiguous. I made some changes. Please see what you think about it.

zml1206 avatar Dec 09 '23 14:12 zml1206

I changed the description. Do you think it’s easier to understand this way? @cloud-fan

zml1206 avatar Dec 11 '23 02:12 zml1206

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Jun 28 '24 00:06 github-actions[bot]