spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[FEA] [Databricks 12.2] Support GpuMergeIntoCommand notMatchedBySourceClauses on GPU

Open andygrove opened this issue 2 years ago • 2 comments

Is your feature request related to a problem? Please describe. PR https://github.com/NVIDIA/spark-rapids/pull/8282 adds basic support for Databricks 12.2 but did not add support for notMatchedBySourceClauses in GpuMergeIntoCommand and instead falls back to CPU.

Describe the solution you'd like We should support notMatchedBySourceClauses on GPU.

Describe alternatives you've considered

Additional context See the discussion at https://github.com/NVIDIA/spark-rapids/pull/8282#discussion_r1204462293 for more details.

andygrove avatar May 26 '23 17:05 andygrove

Please ignore. This was unrelated to this issue and is now resolved.

~One challenge in implementing this is that we replace JoinedRowProcessor with a RapidsProcessDeltaMergeJoin logical operator that involves a non-deterministic expression (monotonically_increasing_id). Spark has an analyzer rule that only allows non-deterministic expressions in a hard-coded set of operators, resulting in the following error:~

23/06/14 16:55:53 ERROR GpuMergeIntoCommand: Fatal error in MERGE with materialized source in attempt 1.
org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found:
a,b,c,_target_row_id_,_row_dropped_,_incr_row_count_,_change_type,(_source_row_present_ IS NULL),(_target_row_present_ IS NULL),true,a,b,c,_target_row_id_,true,(UDF() AND UDF()),CAST(NULL AS STRING),a,b,c,_target_row_id_,false,true,'delete',a,b,c,_target_row_id_,false,UDF(),CAST(NULL AS STRING),a,b,c,_target_row_id_,true,true,CAST(NULL AS STRING)
in operator RapidsProcessDeltaMergeJoin [a#304727, b#304728, c#304729, _target_row_id_#304730L, _row_dropped_#304731, _incr_row_count_#304732, _change_type#304733], isnull(_source_row_present_#304699), isnull(_target_row_present_#304702), [true], [[[a#304425, b#304426, c#304427, _target_row_id_#304707L, true, (UDF() AND UDF()), null], [a#304425, b#304426, c#304427, _target_row_id_#304707L, false, true, delete]]], [a#304425, b#304426, c#304427, _target_row_id_#304707L, false, UDF(), null], [a#304425, b#304426, c#304427, _target_row_id_#304707L, true, true, null].; line 1 pos 0;

Here is the Spark code from CheckAnalysis:

          case o if o.expressions.exists(!_.deterministic) &&
            !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
            !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] &&
            // Lateral join is checked in checkSubqueryExpression.
            !o.isInstanceOf[LateralJoin] =>
            // The rule above is used to check Aggregate operator.
            o.failAnalysis(
              errorClass = "_LEGACY_ERROR_TEMP_2439",
              messageParameters = Map(
                "sqlExprs" -> o.expressions.map(_.sql).mkString(","),
                "operator" -> operator.simpleString(SQLConf.get.maxToStringFields)))

For now, we avoid replacing the operator if notMatchedBySourceClauses is non-empty.

andygrove avatar Jun 14 '23 18:06 andygrove

We have the same issue for OSS delta as well.

jihoonson avatar Dec 02 '25 19:12 jihoonson