[WIP] Remove `COMET_SHUFFLE_FALLBACK_TO_COLUMNAR` config
Which issue does this PR close?
Closes https://github.com/apache/datafusion-comet/issues/1254
Rationale for this change
The config COMET_SHUFFLE_FALLBACK_TO_COLUMNAR was added as a workaround for Spark SQL test failures and we need to remove it since users could run into the same bugs that is causing the Spark SQL failures.
What changes are included in this PR?
- [x] Remove config
- [ ] Fix Spark SQL test failures:
core1
- test with low buffer spill threshold
- SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate
- reverse preceding/following range between with aggregation
core2
- subquery/exists-subquery/exists-orderby-limit.sql
core3
- SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation
- SPARK-38237: require all cluster keys for child required distribution for window query
How are these changes tested?
Codecov Report
Attention: Patch coverage is 0% with 4 lines in your changes missing coverage. Please review.
Project coverage is 59.40%. Comparing base (
f09f8af) to head (c105513). Report is 247 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #1736 +/- ##
============================================
+ Coverage 56.12% 59.40% +3.27%
- Complexity 976 1151 +175
============================================
Files 119 130 +11
Lines 11743 12661 +918
Branches 2251 2375 +124
============================================
+ Hits 6591 7521 +930
+ Misses 4012 3930 -82
- Partials 1140 1210 +70
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
There are now many more test failures than when this PR was first created.
Working on this
Working on this
Current failures:
core1:
2025-06-05T16:40:13.0938574Z [info] - avoid reordering broadcast join keys to match input hash partitioning *** FAILED *** (2 seconds, 403 milliseconds)
2025-06-05T16:41:30.1094281Z [info] - test with low buffer spill threshold *** FAILED *** (189 milliseconds)
2025-06-05T16:45:04.6680286Z [info] - avoid reordering broadcast join keys to match input hash partitioning *** FAILED *** (1 second, 747 milliseconds)
2025-06-05T16:45:08.9350122Z [info] - SPARK-32509: Unused Dynamic Pruning filter shouldn't affect canonicalization and exchange reuse *** FAILED *** (141 milliseconds)
2025-06-05T16:52:15.8171988Z [info] - SPARK-47146: thread leak when doing SortMergeJoin (with spill) *** FAILED *** (297 milliseconds)
core3:
2025-06-05T16:48:35.4894787Z [info] - SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation *** FAILED *** (197 milliseconds)
TPC-DS:
- q44 *** FAILED ***
- ColumnarToRow does not implement doExecuteBroadcas
The Spark SQL tests for 3.5.5 are now all passing, and there is just the TPC-DS issue left
The remaining failure is related to exchange reuse in TPC-DS q44.
2025-06-06T00:48:02.3079684Z OUTPUT: TakeOrderedAndProject(limit=100, orderBy=[rnk#18761 ASC NULLS FIRST], output=[rnk#18761,best_performing#18767,worst_performing#18768])
2025-06-06T00:48:02.3081088Z +- Project [rnk#18761, i_product_name#18894 AS best_performing#18767, i_product_name#18905 AS worst_performing#18768]
2025-06-06T00:48:02.3082152Z +- BroadcastHashJoin [item_sk#18762], [i_item_sk#18796], Inner, BuildRight, false
2025-06-06T00:48:02.3082940Z :- Project [rnk#18761, item_sk#18762, i_product_name#18894]
2025-06-06T00:48:02.3083811Z : +- BroadcastHashJoin [item_sk#18757], [i_item_sk#722], Inner, BuildRight, false
2025-06-06T00:48:02.3084580Z : :- Project [item_sk#18757, rnk#18761, item_sk#18762]
2025-06-06T00:48:02.3085314Z : : +- BroadcastHashJoin [rnk#18761], [rnk#18766], Inner, BuildRight, false
2025-06-06T00:48:02.3086015Z : : :- Project [item_sk#18757, rnk#18761]
2025-06-06T00:48:02.3086766Z : : : +- Filter ((rnk#18761 < 11) AND isnotnull(item_sk#18757))
2025-06-06T00:48:02.3088247Z : : : +- Window [rank(rank_col#18758) windowspecdefinition(rank_col#18758 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#18761], [rank_col#18758 ASC NULLS FIRST]
2025-06-06T00:48:02.3089876Z : : : +- WindowGroupLimit [rank_col#18758 ASC NULLS FIRST], rank(rank_col#18758), 10, Final
2025-06-06T00:48:02.3090689Z : : : +- Sort [rank_col#18758 ASC NULLS FIRST], false, 0
2025-06-06T00:48:02.3091724Z : : : +- Filter (isnotnull(rank_col#18758) AND (cast(rank_col#18758 as decimal(13,7)) > (0.9 * ReusedSubquery Subquery subquery#18765, [id=#147283])))
2025-06-06T00:48:02.3092900Z : : : : +- ReusedSubquery Subquery subquery#18765, [id=#147283]
2025-06-06T00:48:02.3093962Z : : : +- HashAggregate(keys=[ss_item_sk#701], functions=[avg(UnscaledValue(ss_net_profit#721))], output=[item_sk#18757, rank_col#18758])
2025-06-06T00:48:02.3094946Z : : : +- CometColumnarToRow
2025-06-06T00:48:02.3095489Z : : : +- ShuffleQueryStage 0
2025-06-06T00:48:02.3096562Z : : : +- CometExchange hashpartitioning(ss_item_sk#701, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146851]
2025-06-06T00:48:02.3097956Z : : : +- !CometHashAggregate [ss_item_sk#701, ss_net_profit#721], Partial, [ss_item_sk#701], [partial_avg(UnscaledValue(ss_net_profit#721))]
2025-06-06T00:48:02.3099165Z : : : +- CometProject [ss_item_sk#701, ss_net_profit#721], [ss_item_sk#701, ss_net_profit#721]
2025-06-06T00:48:02.3100381Z : : : +- CometFilter [ss_item_sk#701, ss_store_sk#706, ss_net_profit#721], (isnotnull(ss_store_sk#706) AND (ss_store_sk#706 = 4))
2025-06-06T00:48:02.3103650Z : : : +- CometScan parquet spark_catalog.default.store_sales[ss_item_sk#701,ss_store_sk#706,ss_net_profit#721] Batched: true, DataFilters: [isnotnull(ss_store_sk#706), (ss_store_sk#706 = 4)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4)], ReadSchema: struct<ss_item_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3106642Z : : +- BroadcastQueryStage 4
2025-06-06T00:48:02.3107602Z : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)),false), [plan_id=147448]
2025-06-06T00:48:02.3108617Z : : +- *(2) Project [item_sk#18762, rnk#18766]
2025-06-06T00:48:02.3109338Z : : +- *(2) Filter ((rnk#18766 < 11) AND isnotnull(item_sk#18762))
2025-06-06T00:48:02.3110827Z : : +- Window [rank(rank_col#18763) windowspecdefinition(rank_col#18763 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#18766], [rank_col#18763 DESC NULLS LAST]
2025-06-06T00:48:02.3112485Z : : +- WindowGroupLimit [rank_col#18763 DESC NULLS LAST], rank(rank_col#18763), 10, Final
2025-06-06T00:48:02.3113319Z : : +- *(1) Sort [rank_col#18763 DESC NULLS LAST], false, 0
2025-06-06T00:48:02.3114412Z : : +- *(1) Filter (isnotnull(rank_col#18763) AND (cast(rank_col#18763 as decimal(13,7)) > (0.9 * Subquery subquery#18765, [id=#147283])))
2025-06-06T00:48:02.3115373Z : : : +- Subquery subquery#18765, [id=#147283]
2025-06-06T00:48:02.3115975Z : : : +- AdaptiveSparkPlan isFinalPlan=true
2025-06-06T00:48:02.3116693Z +- == Final Plan ==
2025-06-06T00:48:02.3117643Z *(1) HashAggregate(keys=[ss_store_sk#18830], functions=[avg(UnscaledValue(ss_net_profit#18845))], output=[rank_col#18759])
2025-06-06T00:48:02.3118606Z +- *(1) CometColumnarToRow
2025-06-06T00:48:02.3119179Z +- ShuffleQueryStage 0
2025-06-06T00:48:02.3120162Z +- CometExchange hashpartitioning(ss_store_sk#18830, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=147498]
2025-06-06T00:48:02.3121750Z +- !CometHashAggregate [ss_store_sk#18830, ss_net_profit#18845], Partial, [ss_store_sk#18830], [partial_avg(UnscaledValue(ss_net_profit#18845))]
2025-06-06T00:48:02.3123117Z +- CometProject [ss_store_sk#18830, ss_net_profit#18845], [ss_store_sk#18830, ss_net_profit#18845]
2025-06-06T00:48:02.3124510Z +- CometFilter [ss_addr_sk#18829, ss_store_sk#18830, ss_net_profit#18845], ((isnotnull(ss_store_sk#18830) AND (ss_store_sk#18830 = 4)) AND isnull(ss_addr_sk#18829))
2025-06-06T00:48:02.3128544Z +- CometScan parquet spark_catalog.default.store_sales[ss_addr_sk#18829,ss_store_sk#18830,ss_net_profit#18845] Batched: true, DataFilters: [isnotnull(ss_store_sk#18830), (ss_store_sk#18830 = 4), isnull(ss_addr_sk#18829)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4), IsNull(ss_addr_sk)], ReadSchema: struct<ss_addr_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3131704Z +- == Initial Plan ==
2025-06-06T00:48:02.3132670Z HashAggregate(keys=[ss_store_sk#18830], functions=[avg(UnscaledValue(ss_net_profit#18845))], output=[rank_col#18759])
2025-06-06T00:48:02.3134048Z +- CometExchange hashpartitioning(ss_store_sk#18830, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146421]
2025-06-06T00:48:02.3135539Z +- !CometHashAggregate [ss_store_sk#18830, ss_net_profit#18845], Partial, [ss_store_sk#18830], [partial_avg(UnscaledValue(ss_net_profit#18845))]
2025-06-06T00:48:02.3139421Z +- CometProject [ss_store_sk#18830, ss_net_profit#18845], [ss_store_sk#18830, ss_net_profit#18845]
2025-06-06T00:48:02.3140970Z +- CometFilter [ss_addr_sk#18829, ss_store_sk#18830, ss_net_profit#18845], ((isnotnull(ss_store_sk#18830) AND (ss_store_sk#18830 = 4)) AND isnull(ss_addr_sk#18829))
2025-06-06T00:48:02.3144752Z +- CometScan parquet spark_catalog.default.store_sales[ss_addr_sk#18829,ss_store_sk#18830,ss_net_profit#18845] Batched: true, DataFilters: [isnotnull(ss_store_sk#18830), (ss_store_sk#18830 = 4), isnull(ss_addr_sk#18829)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4), IsNull(ss_addr_sk)], ReadSchema: struct<ss_addr_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3148516Z : : +- *(1) HashAggregate(keys=[ss_item_sk#18775], functions=[avg(UnscaledValue(ss_net_profit#18795))], output=[item_sk#18762, rank_col#18763])
2025-06-06T00:48:02.3149536Z : : +- *(1) ColumnarToRow
2025-06-06T00:48:02.3150077Z : : +- ShuffleQueryStage 1
2025-06-06T00:48:02.3151311Z : : +- ReusedExchange [ss_item_sk#18775, sum#18915, count#18916L], CometExchange hashpartitioning(ss_item_sk#701, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146851]
2025-06-06T00:48:02.3152516Z : +- CometColumnarToRow
2025-06-06T00:48:02.3152984Z : +- BroadcastQueryStage 2
2025-06-06T00:48:02.3153616Z : +- CometBroadcastExchange [i_item_sk#722, i_product_name#18894]
2025-06-06T00:48:02.3155540Z : +- CometProject [i_item_sk#722, i_product_name#18894], [i_item_sk#722, staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, i_product_name#743, 50, true, false, true) AS i_product_name#18894]
2025-06-06T00:48:02.3157539Z : +- CometFilter [i_item_sk#722, i_product_name#743], isnotnull(i_item_sk#722)
2025-06-06T00:48:02.3160081Z : +- CometScan parquet spark_catalog.default.item[i_item_sk#722,i_product_name#743] Batched: true, DataFilters: [isnotnull(i_item_sk#722)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/item], PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:int,i_product_name:string>
2025-06-06T00:48:02.3162411Z +- ColumnarToRow
2025-06-06T00:48:02.3162821Z +- BroadcastQueryStage 3
2025-06-06T00:48:02.3163722Z +- ReusedExchange [i_item_sk#18796, i_product_name#18905], CometBroadcastExchange [i_item_sk#722, i_product_name#18894]
Note the C2R around a reused exchange:
2025-06-06T00:48:02.3162411Z +- ColumnarToRow
2025-06-06T00:48:02.3162821Z +- BroadcastQueryStage 3
2025-06-06T00:48:02.3163722Z +- ReusedExchange [i_item_sk#18796, i_product_name#18905],
Test now pass.
replaced with https://github.com/apache/datafusion-comet/pull/1865