delta
delta copied to clipboard
[BUG] Merge Fails with ambiguous errors on some types using python API
Bug
Describe the problem
When trying to merge with condition using python data API sometimes an pyspark.sql.utils.AnalysisException: Failed to resolve
is raised.
Steps to reproduce
df = spark.createDataFrame([(1,2,3),(4,5,6),(7,8,9)]).select("*", F.create_map("_1","_2").alias("mapCol"))
df.write.format("delta").save("/tmp/delta/test_table")
dt = DeltaTable.forPath(spark, "/tmp/delta/test_table")
dt.alias("old").merge(
df.alias("new"),
condition=F.expr("old._1 == new._1"),
).whenMatchedUpdate(
condition=F.expr("old.mapCol != new.mapCol"), # This is where the issue is.
set={"mapCol": F.col("new.mapCol")},
).whenNotMatchedInsertAll().execute()
Observed results
Traceback (most recent call last):
File "<stdin>", line 6, in <module>
File "<snip>io.delta_delta-core_2.12-2.0.0.jar/delta/tables.py", line 897, in execute
File "/usr/lib/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: Failed to resolve
;
'DeltaMergeInto (_1#1150L = _1#785L), [Update [condition: NOT (mapCol#1153 = mapCol#791), actions: [`mapCol` = mapCol#791]]], [Insert [actions: [`_1` = _1#785L, `_2` = _2#786L, `_3` = _3#787L, `mapCol` = mapCol#791]]], false, StructType(StructField(_1,LongType,true), StructField(_2,LongType,true), StructField(_3,LongType,true), StructField(mapCol,MapType(LongType,LongType,true),true))
:- SubqueryAlias old
: +- Relation [_1#1150L,_2#1151L,_3#1152L,mapCol#1153] parquet
+- SubqueryAlias new
+- Project [_1#785L, _2#786L, _3#787L, map(_1#785L, _2#786L) AS mapCol#791]
+- LogicalRDD [_1#785L, _2#786L, _3#787L], false
Expected results
Ideally, it to "Just work", however, since I believe this to be an issue with the type I was comparing, propagating the actual error would the more realistic fix :)
Further details
The type that this happened to me with was a column of MapType, when trying to compare the column against itself, I got the following error:
pyspark.sql.utils.AnalysisException: cannot resolve '(spark_catalog.my_db.my_table.data = spark_catalog.my_db.my_table.data)' due to data type mismatch: EqualTo does not support ordering on type map<string,double>; line 1 pos 0;
I'd strongly prefer to have some of this information in the error reported by deltalake, rather than just "Failed to Resolve".
Environment information
- Delta Lake version: 2.0.0
- Spark version: 3.2
- Scala version: 2.12
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [X] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [X] No. I cannot contribute a bug fix at this time.
(I'm not well versed in the scala/java side of the project -- if this is something that could be resolved by editing the python bindings I may be able to help out).
Thanks @PadenZach. Acknowledging the repro is successful and it's a legitimate issue. The command should not swallow the specific error, in this case that map is not a comparable type
We'll leave this open for now; there is no corruption since an error is still thrown. We'll prioritize it accordingly to improve usability. Thank you!