datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

[WIP] Remove `COMET_SHUFFLE_FALLBACK_TO_COLUMNAR` config

Open andygrove opened this issue 7 months ago • 1 comments

Which issue does this PR close?

Closes https://github.com/apache/datafusion-comet/issues/1254

Rationale for this change

The config COMET_SHUFFLE_FALLBACK_TO_COLUMNAR was added as a workaround for Spark SQL test failures and we need to remove it since users could run into the same bugs that is causing the Spark SQL failures.

What changes are included in this PR?

  • [x] Remove config
  • [ ] Fix Spark SQL test failures:

core1

  • test with low buffer spill threshold
  • SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate
  • reverse preceding/following range between with aggregation

core2

  • subquery/exists-subquery/exists-orderby-limit.sql

core3

  • SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation
  • SPARK-38237: require all cluster keys for child required distribution for window query

How are these changes tested?

andygrove avatar May 13 '25 17:05 andygrove

Codecov Report

Attention: Patch coverage is 0% with 4 lines in your changes missing coverage. Please review.

Project coverage is 59.40%. Comparing base (f09f8af) to head (c105513). Report is 247 commits behind head on main.

Files with missing lines Patch % Lines
...he/comet/rules/EliminateRedundantTransitions.scala 0.00% 1 Missing and 2 partials :warning:
...pache/spark/sql/comet/CometColumnarToRowExec.scala 0.00% 0 Missing and 1 partial :warning:
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1736      +/-   ##
============================================
+ Coverage     56.12%   59.40%   +3.27%     
- Complexity      976     1151     +175     
============================================
  Files           119      130      +11     
  Lines         11743    12661     +918     
  Branches       2251     2375     +124     
============================================
+ Hits           6591     7521     +930     
+ Misses         4012     3930      -82     
- Partials       1140     1210      +70     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

codecov-commenter avatar May 13 '25 20:05 codecov-commenter

There are now many more test failures than when this PR was first created.

andygrove avatar May 30 '25 20:05 andygrove

Working on this

coderfender avatar Jun 02 '25 04:06 coderfender

Working on this

coderfender avatar Jun 02 '25 04:06 coderfender

Current failures:

core1:

2025-06-05T16:40:13.0938574Z [info] - avoid reordering broadcast join keys to match input hash partitioning *** FAILED *** (2 seconds, 403 milliseconds)
2025-06-05T16:41:30.1094281Z [info] - test with low buffer spill threshold *** FAILED *** (189 milliseconds)
2025-06-05T16:45:04.6680286Z [info] - avoid reordering broadcast join keys to match input hash partitioning *** FAILED *** (1 second, 747 milliseconds)
2025-06-05T16:45:08.9350122Z [info] - SPARK-32509: Unused Dynamic Pruning filter shouldn't affect canonicalization and exchange reuse *** FAILED *** (141 milliseconds)
2025-06-05T16:52:15.8171988Z [info] - SPARK-47146: thread leak when doing SortMergeJoin (with spill) *** FAILED *** (297 milliseconds)

core3:

2025-06-05T16:48:35.4894787Z [info] - SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation *** FAILED *** (197 milliseconds)

TPC-DS:

- q44 *** FAILED ***
  - ColumnarToRow does not implement doExecuteBroadcas

andygrove avatar Jun 05 '25 17:06 andygrove

The Spark SQL tests for 3.5.5 are now all passing, and there is just the TPC-DS issue left

andygrove avatar Jun 05 '25 20:06 andygrove

The remaining failure is related to exchange reuse in TPC-DS q44.

2025-06-06T00:48:02.3079684Z OUTPUT: TakeOrderedAndProject(limit=100, orderBy=[rnk#18761 ASC NULLS FIRST], output=[rnk#18761,best_performing#18767,worst_performing#18768])
2025-06-06T00:48:02.3081088Z +- Project [rnk#18761, i_product_name#18894 AS best_performing#18767, i_product_name#18905 AS worst_performing#18768]
2025-06-06T00:48:02.3082152Z    +- BroadcastHashJoin [item_sk#18762], [i_item_sk#18796], Inner, BuildRight, false
2025-06-06T00:48:02.3082940Z       :- Project [rnk#18761, item_sk#18762, i_product_name#18894]
2025-06-06T00:48:02.3083811Z       :  +- BroadcastHashJoin [item_sk#18757], [i_item_sk#722], Inner, BuildRight, false
2025-06-06T00:48:02.3084580Z       :     :- Project [item_sk#18757, rnk#18761, item_sk#18762]
2025-06-06T00:48:02.3085314Z       :     :  +- BroadcastHashJoin [rnk#18761], [rnk#18766], Inner, BuildRight, false
2025-06-06T00:48:02.3086015Z       :     :     :- Project [item_sk#18757, rnk#18761]
2025-06-06T00:48:02.3086766Z       :     :     :  +- Filter ((rnk#18761 < 11) AND isnotnull(item_sk#18757))
2025-06-06T00:48:02.3088247Z       :     :     :     +- Window [rank(rank_col#18758) windowspecdefinition(rank_col#18758 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#18761], [rank_col#18758 ASC NULLS FIRST]
2025-06-06T00:48:02.3089876Z       :     :     :        +- WindowGroupLimit [rank_col#18758 ASC NULLS FIRST], rank(rank_col#18758), 10, Final
2025-06-06T00:48:02.3090689Z       :     :     :           +- Sort [rank_col#18758 ASC NULLS FIRST], false, 0
2025-06-06T00:48:02.3091724Z       :     :     :              +- Filter (isnotnull(rank_col#18758) AND (cast(rank_col#18758 as decimal(13,7)) > (0.9 * ReusedSubquery Subquery subquery#18765, [id=#147283])))
2025-06-06T00:48:02.3092900Z       :     :     :                 :  +- ReusedSubquery Subquery subquery#18765, [id=#147283]
2025-06-06T00:48:02.3093962Z       :     :     :                 +- HashAggregate(keys=[ss_item_sk#701], functions=[avg(UnscaledValue(ss_net_profit#721))], output=[item_sk#18757, rank_col#18758])
2025-06-06T00:48:02.3094946Z       :     :     :                    +- CometColumnarToRow
2025-06-06T00:48:02.3095489Z       :     :     :                       +- ShuffleQueryStage 0
2025-06-06T00:48:02.3096562Z       :     :     :                          +- CometExchange hashpartitioning(ss_item_sk#701, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146851]
2025-06-06T00:48:02.3097956Z       :     :     :                             +- !CometHashAggregate [ss_item_sk#701, ss_net_profit#721], Partial, [ss_item_sk#701], [partial_avg(UnscaledValue(ss_net_profit#721))]
2025-06-06T00:48:02.3099165Z       :     :     :                                +- CometProject [ss_item_sk#701, ss_net_profit#721], [ss_item_sk#701, ss_net_profit#721]
2025-06-06T00:48:02.3100381Z       :     :     :                                   +- CometFilter [ss_item_sk#701, ss_store_sk#706, ss_net_profit#721], (isnotnull(ss_store_sk#706) AND (ss_store_sk#706 = 4))
2025-06-06T00:48:02.3103650Z       :     :     :                                      +- CometScan parquet spark_catalog.default.store_sales[ss_item_sk#701,ss_store_sk#706,ss_net_profit#721] Batched: true, DataFilters: [isnotnull(ss_store_sk#706), (ss_store_sk#706 = 4)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4)], ReadSchema: struct<ss_item_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3106642Z       :     :     +- BroadcastQueryStage 4
2025-06-06T00:48:02.3107602Z       :     :        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)),false), [plan_id=147448]
2025-06-06T00:48:02.3108617Z       :     :           +- *(2) Project [item_sk#18762, rnk#18766]
2025-06-06T00:48:02.3109338Z       :     :              +- *(2) Filter ((rnk#18766 < 11) AND isnotnull(item_sk#18762))
2025-06-06T00:48:02.3110827Z       :     :                 +- Window [rank(rank_col#18763) windowspecdefinition(rank_col#18763 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#18766], [rank_col#18763 DESC NULLS LAST]
2025-06-06T00:48:02.3112485Z       :     :                    +- WindowGroupLimit [rank_col#18763 DESC NULLS LAST], rank(rank_col#18763), 10, Final
2025-06-06T00:48:02.3113319Z       :     :                       +- *(1) Sort [rank_col#18763 DESC NULLS LAST], false, 0
2025-06-06T00:48:02.3114412Z       :     :                          +- *(1) Filter (isnotnull(rank_col#18763) AND (cast(rank_col#18763 as decimal(13,7)) > (0.9 * Subquery subquery#18765, [id=#147283])))
2025-06-06T00:48:02.3115373Z       :     :                             :  +- Subquery subquery#18765, [id=#147283]
2025-06-06T00:48:02.3115975Z       :     :                             :     +- AdaptiveSparkPlan isFinalPlan=true
2025-06-06T00:48:02.3116693Z                                                    +- == Final Plan ==
2025-06-06T00:48:02.3117643Z                                                       *(1) HashAggregate(keys=[ss_store_sk#18830], functions=[avg(UnscaledValue(ss_net_profit#18845))], output=[rank_col#18759])
2025-06-06T00:48:02.3118606Z                                                       +- *(1) CometColumnarToRow
2025-06-06T00:48:02.3119179Z                                                          +- ShuffleQueryStage 0
2025-06-06T00:48:02.3120162Z                                                             +- CometExchange hashpartitioning(ss_store_sk#18830, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=147498]
2025-06-06T00:48:02.3121750Z                                                                +- !CometHashAggregate [ss_store_sk#18830, ss_net_profit#18845], Partial, [ss_store_sk#18830], [partial_avg(UnscaledValue(ss_net_profit#18845))]
2025-06-06T00:48:02.3123117Z                                                                   +- CometProject [ss_store_sk#18830, ss_net_profit#18845], [ss_store_sk#18830, ss_net_profit#18845]
2025-06-06T00:48:02.3124510Z                                                                      +- CometFilter [ss_addr_sk#18829, ss_store_sk#18830, ss_net_profit#18845], ((isnotnull(ss_store_sk#18830) AND (ss_store_sk#18830 = 4)) AND isnull(ss_addr_sk#18829))
2025-06-06T00:48:02.3128544Z                                                                         +- CometScan parquet spark_catalog.default.store_sales[ss_addr_sk#18829,ss_store_sk#18830,ss_net_profit#18845] Batched: true, DataFilters: [isnotnull(ss_store_sk#18830), (ss_store_sk#18830 = 4), isnull(ss_addr_sk#18829)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4), IsNull(ss_addr_sk)], ReadSchema: struct<ss_addr_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3131704Z                                                    +- == Initial Plan ==
2025-06-06T00:48:02.3132670Z                                                       HashAggregate(keys=[ss_store_sk#18830], functions=[avg(UnscaledValue(ss_net_profit#18845))], output=[rank_col#18759])
2025-06-06T00:48:02.3134048Z                                                       +- CometExchange hashpartitioning(ss_store_sk#18830, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146421]
2025-06-06T00:48:02.3135539Z                                                          +- !CometHashAggregate [ss_store_sk#18830, ss_net_profit#18845], Partial, [ss_store_sk#18830], [partial_avg(UnscaledValue(ss_net_profit#18845))]
2025-06-06T00:48:02.3139421Z                                                             +- CometProject [ss_store_sk#18830, ss_net_profit#18845], [ss_store_sk#18830, ss_net_profit#18845]
2025-06-06T00:48:02.3140970Z                                                                +- CometFilter [ss_addr_sk#18829, ss_store_sk#18830, ss_net_profit#18845], ((isnotnull(ss_store_sk#18830) AND (ss_store_sk#18830 = 4)) AND isnull(ss_addr_sk#18829))
2025-06-06T00:48:02.3144752Z                                                                   +- CometScan parquet spark_catalog.default.store_sales[ss_addr_sk#18829,ss_store_sk#18830,ss_net_profit#18845] Batched: true, DataFilters: [isnotnull(ss_store_sk#18830), (ss_store_sk#18830 = 4), isnull(ss_addr_sk#18829)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4), IsNull(ss_addr_sk)], ReadSchema: struct<ss_addr_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3148516Z       :     :                             +- *(1) HashAggregate(keys=[ss_item_sk#18775], functions=[avg(UnscaledValue(ss_net_profit#18795))], output=[item_sk#18762, rank_col#18763])
2025-06-06T00:48:02.3149536Z       :     :                                +- *(1) ColumnarToRow
2025-06-06T00:48:02.3150077Z       :     :                                   +- ShuffleQueryStage 1
2025-06-06T00:48:02.3151311Z       :     :                                      +- ReusedExchange [ss_item_sk#18775, sum#18915, count#18916L], CometExchange hashpartitioning(ss_item_sk#701, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146851]
2025-06-06T00:48:02.3152516Z       :     +- CometColumnarToRow
2025-06-06T00:48:02.3152984Z       :        +- BroadcastQueryStage 2
2025-06-06T00:48:02.3153616Z       :           +- CometBroadcastExchange [i_item_sk#722, i_product_name#18894]
2025-06-06T00:48:02.3155540Z       :              +- CometProject [i_item_sk#722, i_product_name#18894], [i_item_sk#722, staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, i_product_name#743, 50, true, false, true) AS i_product_name#18894]
2025-06-06T00:48:02.3157539Z       :                 +- CometFilter [i_item_sk#722, i_product_name#743], isnotnull(i_item_sk#722)
2025-06-06T00:48:02.3160081Z       :                    +- CometScan parquet spark_catalog.default.item[i_item_sk#722,i_product_name#743] Batched: true, DataFilters: [isnotnull(i_item_sk#722)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/item], PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:int,i_product_name:string>
2025-06-06T00:48:02.3162411Z       +- ColumnarToRow
2025-06-06T00:48:02.3162821Z          +- BroadcastQueryStage 3
2025-06-06T00:48:02.3163722Z             +- ReusedExchange [i_item_sk#18796, i_product_name#18905], CometBroadcastExchange [i_item_sk#722, i_product_name#18894]

Note the C2R around a reused exchange:

2025-06-06T00:48:02.3162411Z       +- ColumnarToRow
2025-06-06T00:48:02.3162821Z          +- BroadcastQueryStage 3
2025-06-06T00:48:02.3163722Z             +- ReusedExchange [i_item_sk#18796, i_product_name#18905], 

andygrove avatar Jun 06 '25 16:06 andygrove

Test now pass.

andygrove avatar Jun 06 '25 19:06 andygrove

replaced with https://github.com/apache/datafusion-comet/pull/1865

andygrove avatar Jun 08 '25 14:06 andygrove