spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

Figure out why `MapFromArrays ` appears in the tests for hive parquet write

Open firestarman opened this issue 1 year ago • 6 comments

Describe the bug

PR https://github.com/NVIDIA/spark-rapids/pull/10912 introduces the parquet support for GpuInsertIntoHiveTable, along with the relevant tests. In some of the tests on Databricks, the ProjectExec will fall back to CPU due to missing the GPU version of the MapFromArrays expression.

It is better to find out the root cause of why this expression appears only in these tests on Databricks.

firestarman avatar May 30 '24 02:05 firestarman

This also happens on Spark 351. See https://github.com/NVIDIA/spark-rapids/issues/10956

firestarman avatar Jun 03 '24 01:06 firestarman

Added back in needs triage because if we really need to understand what is happening. If we cannot do something simple with DB like this it is either a bug in our code or theirs and we need to know which.

revans2 avatar Jun 17 '24 16:06 revans2

@firestarman this needs to be investigated to figure out the root cause given we'll have an unneeded fallback with this feature on Databricks.

mattahrens avatar Jun 18 '24 20:06 mattahrens

This MapFromArrays also appears in the tests on Spark 350+.

However this is not a Plugin bug, I think. Because Spark 350+ generates a different plan for the Hive style write (INSERT OVERWRITE TABLE a_new_created_table SELECT * FROM another_table) than Spark 34x.

Spark 341

*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHiveTable> will run on GPU
  *Exec <WriteFilesExec> will run on GPU
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
      @Expression <AttributeReference> _c0#30 could run on GPU

Spark 350 and 351

*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHiveTable> will run on GPU
  *Exec <WriteFilesExec> will run on GPU
    !Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
      @Expression <Alias> map_from_arrays(transform(map_keys(_c0#26), lambdafunction(lambda key#37, lambda key#37, false)), transform(map_values(_c0#26), lambdafunction(lambda value#39, lambda value#39, false))) AS _c0#41 could run on GPU
        ! <MapFromArrays> map_from_arrays(transform(map_keys(_c0#26), lambdafunction(lambda key#37, lambda key#37, false)), transform(map_values(_c0#26), lambdafunction(lambda value#39, lambda value#39, false))) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.MapFromArrays
          @Expression <ArrayTransform> transform(map_keys(_c0#26), lambdafunction(lambda key#37, lambda key#37, false)) could run on GPU
            @Expression <MapKeys> map_keys(_c0#26) could run on GPU
              @Expression <AttributeReference> _c0#26 could run on GPU
            @Expression <LambdaFunction> lambdafunction(lambda key#37, lambda key#37, false) could run on GPU
              @Expression <NamedLambdaVariable> lambda key#37 could run on GPU
              @Expression <NamedLambdaVariable> lambda key#37 could run on GPU
          @Expression <ArrayTransform> transform(map_values(_c0#26), lambdafunction(lambda value#39, lambda value#39, false)) could run on GPU
            @Expression <MapValues> map_values(_c0#26) could run on GPU
              @Expression <AttributeReference> _c0#26 could run on GPU
            @Expression <LambdaFunction> lambdafunction(lambda value#39, lambda value#39, false) could run on GPU
              @Expression <NamedLambdaVariable> lambda value#39 could run on GPU
              @Expression <NamedLambdaVariable> lambda value#39 could run on GPU
      ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
        @Expression <AttributeReference> _c0#26 could run on GPU

But the CTAS command (CREATE TABLE new_table STORED AS PARQUET AS SELECT * FROM a_existing_table) still has the same plan tree.

*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHiveTable> will run on GPU
  *Exec <WriteFilesExec> will run on GPU
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
      @Expression <AttributeReference> _c0#0 could run on GPU

firestarman avatar Jun 19 '24 07:06 firestarman

This appears to be coming form https://github.com/apache/spark/blob/fd86f85e181fc2dc0f50a096855acf83a6cc5d9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala#L381-L421

It appears that https://issues.apache.org/jira/browse/SPARK-42151 https://github.com/apache/spark/pull/40308

So technically this is a regression, more accurately a performance regression, in that we could run the query fully on the GPU before, but now we cannot.

revans2 avatar Jun 24 '24 15:06 revans2

@sameerz and @mattahrens we now know why the regression has happened and we need to decide what the next steps are. Implementing this is not too difficult. We mainly need to verify that the array lengths are the same everywhere and then pull out the data column from each of the arrays and turn them into a struct.

revans2 avatar Jun 24 '24 15:06 revans2