spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[BUG] Support AQE with Broadcast Hash Join and DPP on Databricks 14.3

Open mythrocks opened this issue 1 year ago • 2 comments

With AQE enabled on Databricks 14.3, the following tests in AQE are failing:

  1. aqe_test.py::test_aqe_join_with_dpp
  2. aqe_test.py::test_aqe_join_with_dpp_multi_columns

This is mentioned as part of #11527. The failure exception appears as follows:

java.lang.AssertionError: should not reach here
        at org.apache.spark.sql.rapids.execution.GpuSubqueryBroadcastMetaBase.tagPlanForGpu(GpuSubqueryBroadcastExec.scala:133)
        at com.nvidia.spark.rapids.SparkPlanMeta.tagSelfForGpu(RapidsMeta.scala:803)
        at com.nvidia.spark.rapids.RapidsMeta.tagForGpu(RapidsMeta.scala:318)
        at com.nvidia.spark.rapids.GpuOverrides$.wrapAndTagPlan(GpuOverrides.scala:4472)
        at com.nvidia.spark.rapids.shims.FileSourceScanExecMeta.com$nvidia$spark$rapids$shims$FileSourceScanExecMeta$$convertBroadcast(FileSourceScanExecMeta.scala:50)
        at com.nvidia.spark.rapids.shims.FileSourceScanExecMeta$$anonfun$$nestedInanonfun$convertDynamicPruningFilters$1$1.applyOrElse(FileSourceScanExecMeta.scala:96)
        at com.nvidia.spark.rapids.shims.FileSourceScanExecMeta$$anonfun$$nestedInanonfun$convertDynamicPruningFilters$1$1.applyOrElse(FileSourceScanExecMeta.scala:92)

The failure indicates that the Adaptive Query Plan has changed on Databricks since 13.3. Tracing the failure indicates that the child exec of AdaptiveSparkPlanExec is not BroadcastExchangeExec, but CollectLimitExec.

The logic in GpuSubqueryBroadcastExecMetaBase::tagPlanForGpu() needs to account for the new plan node.

mythrocks avatar Oct 22 '24 22:10 mythrocks

Side note: I don't have the full details on the post-AQE plan here. If I'm unable to get to debugging this problem in short order, someone is going to have to extract this from the debugger.

mythrocks avatar Oct 22 '24 22:10 mythrocks

Also, the error message here could stand to be more informative:

java.lang.AssertionError: should not reach here

"Should not reach here" can already be inferred from this being an AssertionError.

mythrocks avatar Oct 22 '24 22:10 mythrocks

After some digging into the plans, it looks like the Databricks 14.3 plan for this query does not involve a BroadcastExchange node at all:

AQE=True on 14.3:  No BroadcastExchange at all.

Transformed Plan:
CollectLimit 21
+- GpuProject [gputoprettystring(site_id#6, None) AS toprettystring(site_id)#196, gputoprettystring(day#5, None) AS toprettystring(day)#197, gputoprettystring(test_id#7, None) AS toprettystring(test_id)#198, gputoprettystring(test_id#0, None) AS toprettystring(test_id)#199, gputoprettystring(site_id#1, None) AS toprettystring(site_id)#200], [loreId=25]
   +- GpuBroadcastHashJoin [test_id#7, site_id#6], [test_id#0, site_id#1], Inner, GpuBuildRight, true, [loreId=24]
      :- GpuUnion [loreId=23]
      :  :- GpuProject [site_id#6, day#5, test_id#7], [loreId=19]
      :  :  +- GpuFileGpuScan parquet [day#5,site_id#6,test_id#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/infoA], PartitionFilters: [isnotnull(day#5), (day#5 = 1990-01-01), site_id#6 IN (site_0,site_1), isnotnull(test_id#7), isno..., PushedFilters: [], ReadSchema: struct<>
      :  +- GpuProject [CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 WHEN (site_id#15 = LONG_SITE_NAME_1) THEN site_1 ELSE site_id#15 END AS site_id#184, day#14, test_spec#12 AS test_id#185], [loreId=22]
      :     +- GpuFilter gpuisnotnull(test_spec#12), [loreId=21]
      :        +- GpuFileGpuScan parquet [test_spec#12,day#14,site_id#15] Batched: true, DataFilters: [isnotnull(test_spec#12), dynamicpruning#207 206], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/infoB], PartitionFilters: [isnotnull(day#14), (day#14 = 1990-01-01), CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 ..., PushedFilters: [IsNotNull(test_spec)], ReadSchema: struct<test_spec:string>
      +- ShuffleQueryStage 0, Statistics(sizeInBytes=942.0 B, rowCount=34, ColumnStat: N/A, isRuntime=true)
         +- GpuColumnarExchange gpusinglepartitioning$(), EXECUTOR_BROADCAST, [plan_id=1739], [loreId=17]
            +- GpuCoalesceBatches targetsize(1073741824), [loreId=16]
               +- GpuFilter ((gpuisnotnull(test_id#0) AND gpuisnotnull(site_id#1)) AND site_id#1 INSET site_0, site_1), [loreId=15]
                  +- GpuFileGpuScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>

With AQE disabled, one does see BroadcastExchange:

AQE=False:  There is a SubqueryBroadcast.

Transformed Plan:
CollectLimit 21
+- GpuColumnarToRow false, [loreId=24]
   +- GpuProject [gputoprettystring(site_id#6, None) AS toprettystring(site_id)#222, gputoprettystring(day#5, None) AS toprettystring(day)#223, gputoprettystring(test_id#7, None) AS toprettystring(test_id)#224, gputoprettystring(test_id#0, None) AS toprettystring(test_id)#225, gputoprettystring(site_id#1, None) AS toprettystring(site_id)#226], [loreId=23]
      +- GpuBroadcastHashJoin [test_id#7, site_id#6], [test_id#0, site_id#1], Inner, GpuBuildRight, false, [loreId=22]
         :- GpuUnion [loreId=17]
         :  :- GpuProject [site_id#6, day#5, test_id#7], [loreId=12]
         :  :  +- GpuFileGpuScan parquet [day#5,site_id#6,test_id#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/infoA], PartitionFilters: [isnotnull(day#5), (day#5 = 1990-01-01), site_id#6 IN (site_0,site_1), isnotnull(test_id#7), isno..., PushedFilters: [], ReadSchema: struct<>
         :  :        :- SubqueryBroadcast dynamicpruning#233, [0], [test_id#0, site_id#1], false, [id=#1950]
         :  :        :  +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false], input[1, string, false]),false), [plan_id=1934]
         :  :        :     +- Filter ((isnotnull(test_id#0) AND isnotnull(site_id#1)) AND site_id#1 IN (site_0,site_1))
         :  :        :        +- FileScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>
         :  :        +- SubqueryBroadcast dynamicpruning#234, [1], [test_id#0, site_id#1], false, [id=#1951]
         :  :           +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false], input[1, string, false]),false), [plan_id=1934]
         :  :              +- Filter ((isnotnull(test_id#0) AND isnotnull(site_id#1)) AND site_id#1 IN (site_0,site_1))
         :  :                 +- FileScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>
         :  +- GpuProject [CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 WHEN (site_id#15 = LONG_SITE_NAME_1) THEN site_1 ELSE site_id#15 END AS site_id#210, day#14, test_spec#12 AS test_id#211], [loreId=16]
         :     +- GpuCoalesceBatches targetsize(1073741824), [loreId=15]
         :        +- GpuFilter gpuisnotnull(test_spec#12), [loreId=14]
         :           +- GpuFileGpuScan parquet [test_spec#12,day#14,site_id#15] Batched: true, DataFilters: [isnotnull(test_spec#12), dynamicpruningexpression(test_spec#12 IN dynamicpruning#233)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/infoB], PartitionFilters: [isnotnull(day#14), (day#14 = 1990-01-01), CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 ..., PushedFilters: [IsNotNull(test_spec)], ReadSchema: struct<test_spec:string>
         :                 :- ReusedSubquery SubqueryBroadcast dynamicpruning#234, [1], [test_id#0, site_id#1], false, [id=#1951]
         :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#233, [0], [test_id#0, site_id#1], false, [id=#1950]
         +- GpuBroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false], input[1, string, false]),false), [plan_id=2011], [loreId=21]
            +- GpuCoalesceBatches targetsize(1073741824), [loreId=20]
               +- GpuFilter ((gpuisnotnull(test_id#0) AND gpuisnotnull(site_id#1)) AND site_id#1 INSET site_0, site_1), [loreId=19]
                  +- GpuFileGpuScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>

Ok, it looks like we're missing the logic from https://github.com/NVIDIA/spark-rapids/pull/6919 for 350db14.3.

mythrocks avatar Nov 20 '24 01:11 mythrocks