datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

[INTERNAL_ERROR] Custom columnar rules cannot transform shuffle node to something else

Open andygrove opened this issue 7 months ago • 2 comments

Describe the bug

This error can happen with DPP queries. There are Spark SQL tests in DynamicPartitionPruningSuite that fail with this error when they are not ignored:

  • SPARK-35568: Fix UnsupportedOperationException when enabling both AQE and DPP
  • SPARK-32509: Unused Dynamic Pruning filter shouldn't affect canonicalization and exchange reuse
  • partition pruning in broadcast hash joins

Steps to reproduce

Run these Spark SQL tests with Comet using the instructions at https://datafusion.apache.org/comet/contributor-guide/spark-sql-tests.html

Expected behavior

We should be falling back to Spark because Comet does not support DPP yet.

Additional context

This is the relevant code in Spark:

  private def newQueryStage(plan: SparkPlan): QueryStageExec = {
    val queryStage = plan match {
      case e: Exchange =>
        val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
        val newPlan = applyPhysicalRules(
          optimized,
          postStageCreationRules(outputsColumnar = plan.supportsColumnar),
          Some((planChangeLogger, "AQE Post Stage Creation")))
        if (e.isInstanceOf[ShuffleExchangeLike]) {
          if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
            throw SparkException.internalError(
              "Custom columnar rules cannot transform shuffle node to something else.")
          }

andygrove avatar May 13 '25 18:05 andygrove

@coderfender is planning on working on this one

andygrove avatar May 15 '25 13:05 andygrove

Thank you @andygrove

coderfender avatar May 15 '25 13:05 coderfender

Some notes from debugging this:

CometExecRule performs the following transformation:

INPUT: Exchange hashpartitioning(date_id#5283, product_id#5284, units_sold#5285, store_id#5286, 5), ENSURE_REQUIREMENTS, [plan_id=860]
+- HashAggregate(keys=[date_id#5283, product_id#5284, units_sold#5285, store_id#5286], functions=[], output=[date_id#5283, product_id#5284, units_sold#5285, store_id#5286])
   +- CometScan parquet spark_catalog.default.fact_sk[date_id#5283,product_id#5284,units_sold#5285,store_id#5286] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(25 paths)[file:/Users/andy/git/apache/apache-spark/sql/core/spark-warehouse/org..., PartitionFilters: [isnotnull(store_id#5286), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int,units_sold:int>

OUTPUT CometExchange hashpartitioning(date_id#5283, product_id#5284, units_sold#5285, store_id#5286, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=870]
+- CometHashAggregate [date_id#5283, product_id#5284, units_sold#5285, store_id#5286], [date_id#5283, product_id#5284, units_sold#5285, store_id#5286]
   +- CometScan parquet spark_catalog.default.fact_sk[date_id#5283,product_id#5284,units_sold#5285,store_id#5286] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(25 paths)[file:/Users/andy/git/apache/apache-spark/sql/core/spark-warehouse/org..., PartitionFilters: [isnotnull(store_id#5286), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int,units_sold:int>

Because the original plan does not support columner, AQE is applying columnar transition rules when invoking postStageCreationRules(outputsColumnar = plan.supportsColumnar) and this causes CometColumnarToRow to get added, which then gets wrapped in WholeStageCodegenExec, at which point AQE fails because it was expecting an exchange.

andygrove avatar May 19 '25 15:05 andygrove