[INTERNAL_ERROR] Custom columnar rules cannot transform shuffle node to something else
Describe the bug
This error can happen with DPP queries. There are Spark SQL tests in DynamicPartitionPruningSuite that fail with this error when they are not ignored:
- SPARK-35568: Fix UnsupportedOperationException when enabling both AQE and DPP
- SPARK-32509: Unused Dynamic Pruning filter shouldn't affect canonicalization and exchange reuse
- partition pruning in broadcast hash joins
Steps to reproduce
Run these Spark SQL tests with Comet using the instructions at https://datafusion.apache.org/comet/contributor-guide/spark-sql-tests.html
Expected behavior
We should be falling back to Spark because Comet does not support DPP yet.
Additional context
This is the relevant code in Spark:
private def newQueryStage(plan: SparkPlan): QueryStageExec = {
val queryStage = plan match {
case e: Exchange =>
val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
val newPlan = applyPhysicalRules(
optimized,
postStageCreationRules(outputsColumnar = plan.supportsColumnar),
Some((planChangeLogger, "AQE Post Stage Creation")))
if (e.isInstanceOf[ShuffleExchangeLike]) {
if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
throw SparkException.internalError(
"Custom columnar rules cannot transform shuffle node to something else.")
}
@coderfender is planning on working on this one
Thank you @andygrove
Some notes from debugging this:
CometExecRule performs the following transformation:
INPUT: Exchange hashpartitioning(date_id#5283, product_id#5284, units_sold#5285, store_id#5286, 5), ENSURE_REQUIREMENTS, [plan_id=860]
+- HashAggregate(keys=[date_id#5283, product_id#5284, units_sold#5285, store_id#5286], functions=[], output=[date_id#5283, product_id#5284, units_sold#5285, store_id#5286])
+- CometScan parquet spark_catalog.default.fact_sk[date_id#5283,product_id#5284,units_sold#5285,store_id#5286] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(25 paths)[file:/Users/andy/git/apache/apache-spark/sql/core/spark-warehouse/org..., PartitionFilters: [isnotnull(store_id#5286), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int,units_sold:int>
OUTPUT CometExchange hashpartitioning(date_id#5283, product_id#5284, units_sold#5285, store_id#5286, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=870]
+- CometHashAggregate [date_id#5283, product_id#5284, units_sold#5285, store_id#5286], [date_id#5283, product_id#5284, units_sold#5285, store_id#5286]
+- CometScan parquet spark_catalog.default.fact_sk[date_id#5283,product_id#5284,units_sold#5285,store_id#5286] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(25 paths)[file:/Users/andy/git/apache/apache-spark/sql/core/spark-warehouse/org..., PartitionFilters: [isnotnull(store_id#5286), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int,units_sold:int>
Because the original plan does not support columner, AQE is applying columnar transition rules when invoking postStageCreationRules(outputsColumnar = plan.supportsColumnar) and this causes CometColumnarToRow to get added, which then gets wrapped in WholeStageCodegenExec, at which point AQE fails because it was expecting an exchange.