spark-rapids
spark-rapids copied to clipboard
Figure out why `MapFromArrays ` appears in the tests for hive parquet write
Describe the bug
PR https://github.com/NVIDIA/spark-rapids/pull/10912 introduces the parquet support for GpuInsertIntoHiveTable, along with the relevant tests. In some of the tests on Databricks, the ProjectExec will fall back to CPU due to missing the GPU version of the MapFromArrays expression.
It is better to find out the root cause of why this expression appears only in these tests on Databricks.
This also happens on Spark 351. See https://github.com/NVIDIA/spark-rapids/issues/10956
Added back in needs triage because if we really need to understand what is happening. If we cannot do something simple with DB like this it is either a bug in our code or theirs and we need to know which.
@firestarman this needs to be investigated to figure out the root cause given we'll have an unneeded fallback with this feature on Databricks.
This MapFromArrays also appears in the tests on Spark 350+.
However this is not a Plugin bug, I think. Because Spark 350+ generates a different plan for the Hive style write (INSERT OVERWRITE TABLE a_new_created_table SELECT * FROM another_table) than Spark 34x.
Spark 341
*Exec <DataWritingCommandExec> will run on GPU
*Output <InsertIntoHiveTable> will run on GPU
*Exec <WriteFilesExec> will run on GPU
! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
@Expression <AttributeReference> _c0#30 could run on GPU
Spark 350 and 351
*Exec <DataWritingCommandExec> will run on GPU
*Output <InsertIntoHiveTable> will run on GPU
*Exec <WriteFilesExec> will run on GPU
!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
@Expression <Alias> map_from_arrays(transform(map_keys(_c0#26), lambdafunction(lambda key#37, lambda key#37, false)), transform(map_values(_c0#26), lambdafunction(lambda value#39, lambda value#39, false))) AS _c0#41 could run on GPU
! <MapFromArrays> map_from_arrays(transform(map_keys(_c0#26), lambdafunction(lambda key#37, lambda key#37, false)), transform(map_values(_c0#26), lambdafunction(lambda value#39, lambda value#39, false))) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.MapFromArrays
@Expression <ArrayTransform> transform(map_keys(_c0#26), lambdafunction(lambda key#37, lambda key#37, false)) could run on GPU
@Expression <MapKeys> map_keys(_c0#26) could run on GPU
@Expression <AttributeReference> _c0#26 could run on GPU
@Expression <LambdaFunction> lambdafunction(lambda key#37, lambda key#37, false) could run on GPU
@Expression <NamedLambdaVariable> lambda key#37 could run on GPU
@Expression <NamedLambdaVariable> lambda key#37 could run on GPU
@Expression <ArrayTransform> transform(map_values(_c0#26), lambdafunction(lambda value#39, lambda value#39, false)) could run on GPU
@Expression <MapValues> map_values(_c0#26) could run on GPU
@Expression <AttributeReference> _c0#26 could run on GPU
@Expression <LambdaFunction> lambdafunction(lambda value#39, lambda value#39, false) could run on GPU
@Expression <NamedLambdaVariable> lambda value#39 could run on GPU
@Expression <NamedLambdaVariable> lambda value#39 could run on GPU
! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
@Expression <AttributeReference> _c0#26 could run on GPU
But the CTAS command (CREATE TABLE new_table STORED AS PARQUET AS SELECT * FROM a_existing_table) still has the same plan tree.
*Exec <DataWritingCommandExec> will run on GPU
*Output <InsertIntoHiveTable> will run on GPU
*Exec <WriteFilesExec> will run on GPU
! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
@Expression <AttributeReference> _c0#0 could run on GPU
This appears to be coming form https://github.com/apache/spark/blob/fd86f85e181fc2dc0f50a096855acf83a6cc5d9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala#L381-L421
It appears that https://issues.apache.org/jira/browse/SPARK-42151 https://github.com/apache/spark/pull/40308
So technically this is a regression, more accurately a performance regression, in that we could run the query fully on the GPU before, but now we cannot.
@sameerz and @mattahrens we now know why the regression has happened and we need to decide what the next steps are. Implementing this is not too difficult. We mainly need to verify that the array lengths are the same everywhere and then pull out the data column from each of the arrays and turn them into a struct.