delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG] Merge Fails with ambiguous errors on some types using python API

Open PadenZach opened this issue 2 years ago • 1 comments

Bug

Describe the problem

When trying to merge with condition using python data API sometimes an pyspark.sql.utils.AnalysisException: Failed to resolve is raised.

Steps to reproduce

df =  spark.createDataFrame([(1,2,3),(4,5,6),(7,8,9)]).select("*", F.create_map("_1","_2").alias("mapCol"))
df.write.format("delta").save("/tmp/delta/test_table")
dt = DeltaTable.forPath(spark, "/tmp/delta/test_table")
dt.alias("old").merge(
    df.alias("new"),
    condition=F.expr("old._1 == new._1"),
).whenMatchedUpdate(
    condition=F.expr("old.mapCol != new.mapCol"), # This is where the issue is.
    set={"mapCol": F.col("new.mapCol")},
).whenNotMatchedInsertAll().execute()

Observed results

Traceback (most recent call last):
  File "<stdin>", line 6, in <module>
  File "<snip>io.delta_delta-core_2.12-2.0.0.jar/delta/tables.py", line 897, in execute
  File "/usr/lib/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Failed to resolve
;
'DeltaMergeInto (_1#1150L = _1#785L), [Update [condition: NOT (mapCol#1153 = mapCol#791), actions: [`mapCol` = mapCol#791]]], [Insert [actions: [`_1` = _1#785L, `_2` = _2#786L, `_3` = _3#787L, `mapCol` = mapCol#791]]], false, StructType(StructField(_1,LongType,true), StructField(_2,LongType,true), StructField(_3,LongType,true), StructField(mapCol,MapType(LongType,LongType,true),true))
:- SubqueryAlias old
:  +- Relation [_1#1150L,_2#1151L,_3#1152L,mapCol#1153] parquet
+- SubqueryAlias new
   +- Project [_1#785L, _2#786L, _3#787L, map(_1#785L, _2#786L) AS mapCol#791]
      +- LogicalRDD [_1#785L, _2#786L, _3#787L], false

Expected results

Ideally, it to "Just work", however, since I believe this to be an issue with the type I was comparing, propagating the actual error would the more realistic fix :)

Further details

The type that this happened to me with was a column of MapType, when trying to compare the column against itself, I got the following error:

pyspark.sql.utils.AnalysisException: cannot resolve '(spark_catalog.my_db.my_table.data = spark_catalog.my_db.my_table.data)' due to data type mismatch: EqualTo does not support ordering on type map<string,double>; line 1 pos 0;

I'd strongly prefer to have some of this information in the error reported by deltalake, rather than just "Failed to Resolve".

Environment information

  • Delta Lake version: 2.0.0
  • Spark version: 3.2
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [X] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [X] No. I cannot contribute a bug fix at this time.

(I'm not well versed in the scala/java side of the project -- if this is something that could be resolved by editing the python bindings I may be able to help out).

PadenZach avatar Aug 11 '22 15:08 PadenZach

Thanks @PadenZach. Acknowledging the repro is successful and it's a legitimate issue. The command should not swallow the specific error, in this case that map is not a comparable type

We'll leave this open for now; there is no corruption since an error is still thrown. We'll prioritize it accordingly to improve usability. Thank you!

nkarpov avatar Aug 11 '22 20:08 nkarpov