[FEA] [Databricks 12.2] Support GpuMergeIntoCommand notMatchedBySourceClauses on GPU
Is your feature request related to a problem? Please describe.
PR https://github.com/NVIDIA/spark-rapids/pull/8282 adds basic support for Databricks 12.2 but did not add support for notMatchedBySourceClauses in GpuMergeIntoCommand and instead falls back to CPU.
Describe the solution you'd like
We should support notMatchedBySourceClauses on GPU.
Describe alternatives you've considered
Additional context See the discussion at https://github.com/NVIDIA/spark-rapids/pull/8282#discussion_r1204462293 for more details.
Please ignore. This was unrelated to this issue and is now resolved.
~One challenge in implementing this is that we replace JoinedRowProcessor with a RapidsProcessDeltaMergeJoin logical operator that involves a non-deterministic expression (monotonically_increasing_id). Spark has an analyzer rule that only allows non-deterministic expressions in a hard-coded set of operators, resulting in the following error:~
23/06/14 16:55:53 ERROR GpuMergeIntoCommand: Fatal error in MERGE with materialized source in attempt 1.
org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found:
a,b,c,_target_row_id_,_row_dropped_,_incr_row_count_,_change_type,(_source_row_present_ IS NULL),(_target_row_present_ IS NULL),true,a,b,c,_target_row_id_,true,(UDF() AND UDF()),CAST(NULL AS STRING),a,b,c,_target_row_id_,false,true,'delete',a,b,c,_target_row_id_,false,UDF(),CAST(NULL AS STRING),a,b,c,_target_row_id_,true,true,CAST(NULL AS STRING)
in operator RapidsProcessDeltaMergeJoin [a#304727, b#304728, c#304729, _target_row_id_#304730L, _row_dropped_#304731, _incr_row_count_#304732, _change_type#304733], isnull(_source_row_present_#304699), isnull(_target_row_present_#304702), [true], [[[a#304425, b#304426, c#304427, _target_row_id_#304707L, true, (UDF() AND UDF()), null], [a#304425, b#304426, c#304427, _target_row_id_#304707L, false, true, delete]]], [a#304425, b#304426, c#304427, _target_row_id_#304707L, false, UDF(), null], [a#304425, b#304426, c#304427, _target_row_id_#304707L, true, true, null].; line 1 pos 0;
Here is the Spark code from CheckAnalysis:
case o if o.expressions.exists(!_.deterministic) &&
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
!o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] &&
// Lateral join is checked in checkSubqueryExpression.
!o.isInstanceOf[LateralJoin] =>
// The rule above is used to check Aggregate operator.
o.failAnalysis(
errorClass = "_LEGACY_ERROR_TEMP_2439",
messageParameters = Map(
"sqlExprs" -> o.expressions.map(_.sql).mkString(","),
"operator" -> operator.simpleString(SQLConf.get.maxToStringFields)))
For now, we avoid replacing the operator if notMatchedBySourceClauses is non-empty.
We have the same issue for OSS delta as well.