spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-39851][SQL] Improve join stats estimation if one side can keep uniqueness

Open wangyum opened this issue 2 years ago • 1 comments

What changes were proposed in this pull request?

This PR improves join stats estimation if one side can keep uniqueness. A common case is:

SELECT i_item_sk ss_item_sk
FROM   item,
       (SELECT DISTINCT iss.i_brand_id    brand_id,
                        iss.i_class_id    class_id,
                        iss.i_category_id category_id
        FROM   item iss) x
WHERE  i_brand_id = brand_id
       AND i_class_id = class_id
       AND i_category_id = category_id 

In this case, the row count of the join will definitely not expand.

Before this PR:

== Optimized Logical Plan ==
Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=370.8 MiB, rowCount=3.24E+7)
+- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = class_id#52)) AND (i_category_id#15 = category_id#53)), Statistics(sizeInBytes=1112.3 MiB, rowCount=3.24E+7)
   :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
   :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
   :     +- Relation spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
      +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 MiB, rowCount=2.02E+5)
         +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
            +- Relation spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)

After this PR:

== Optimized Logical Plan ==
Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=2.3 MiB, rowCount=2.02E+5)
+- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = class_id#52)) AND (i_category_id#15 = category_id#53)), Statistics(sizeInBytes=7.0 MiB, rowCount=2.02E+5)
   :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
   :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
   :     +- Relation spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
      +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 MiB, rowCount=2.02E+5)
         +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
            +- Relation spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)

Why are the changes needed?

Plan more broadcast joins to improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

wangyum avatar Jul 24 '22 13:07 wangyum

@cloud-fan

wangyum avatar Jul 26 '22 02:07 wangyum

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Dec 03 '22 00:12 github-actions[bot]