delta
delta copied to clipboard
Delta merge doesn't update schema (automatic schema evolution enabled)
Hi,
I am having problems with the Automatic Schema Evolution for merges with delta tables.
I have a certain Delta table in my data lake with around 330 columns (the target table) and I want to upsert some new records into this delta table. The thing is that this 'source' table has some extra columns that aren't present in the target Delta table. I use the following code for the merge in Databricks:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")
from delta.tables import *
deltaTarget = DeltaTable.forPath(spark, pathInDataLake)
deltaTarget.alias('target').merge(df.alias('source'), mergeStatement).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
While the documentation on Automatic schema evolution indicates that the schema will be changed when using .whenMatchedUpdateAll() and .whenNotMatchedInsertAll(), this piece of code gives the following error:
AnalysisException: cannot resolve new_column
in UPDATE clause given columns [list of columns in the target table]
.
I have the impression that I had this issue in the past but was able to solve it then with the spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")
setting.
Am I missing something to make the automatic schema evolution work?
Looks like you have a trailing space at the end of "spark.databricks.delta.schema.autoMerge.enabled ". Can you check if it works after removing that? (It's unfortunate that there's no Spark handle to catch this and ensure that only valid confs are specified.)
Thanks for your comment, you are right but unfortunately this doesn't solve the problem. The autoMerge option was also enabled in the Spark config of the Databricks cluster so I think it was enabled either way, but it still gives the same error.
I've come across the same issue when merging a dataframe with 4 new fields into a delta table with >900 fields
I had the same problem. Use spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True) and not spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") and it should work.
We are having this same issue for our pipelines running on Azure Data Factory. Is there an ETA for the fix? TIA!
Schema automerge, using the same merge to the same delta table, worked in Scala, but not Python. Is there an ETA for the fix? Thanks.
@thonsinger-rseg Do you have a reproduction? I tried the following code and it worked for me:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
from delta.tables import *
path = "/tmp/mergetest"
df = spark.range(20).selectExpr("id", "id as id2")
spark.range(10).write.format("delta").save(path)
target = DeltaTable.forPath(spark, path)
target.alias('target').merge(df.alias('source'), "target.id = source.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
DeltaTable.forPath(spark, path).toDF().show()
@zsxwing i have a whenMatched() condition that comes before my .whenMatchedUpdateAll() and .whenNotMatchedInsertAll(). that's the only real difference. exact same logic on the exact same delta table worked in Scala, but not Python.
(silver_table.alias("t") .merge(bronze_cdc_df.alias("s"), merge_match) .whenMatchedUpdate("s.DeletedDate IS NOT NULL", soft_deleted_cols_map) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute())
I have the same issue. A table with about 20 or so columns. I am merging a data frame that has 1 additional column, and a smaller number of records. The new column contains integers and has no null values.
In Scala, this works with no problem.
In Python it fails, on an error similar to:
AnalysisException: cannot resolve 'struct(ID, oldField, newField)' due to data type mismatch: cannot cast struct<ID:int, oldField:int, newField:void> to struct<ID:int, oldField:int, newField:int>;
It seems like the schema is being evolved to a void for the new field, and then merge fails due to mismatch on data types.
zsxwing's example works, but a real table in our warehouse does not.
There is a workaround for this. Do an empty dataframe append with schema merge before doing the delta merge:
df.limit(0).write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(tableName)
Then perform the normal merge using DeltaTable, but don't enable spark.databricks.delta.schema.autoMerge.enabled
For some reason append merge schema works, but delta auto merge does not.
@AlexWRZ01 could you provide the table schema and the merged data frame schema if possible so that we can try to create a reproduction? You can just call the schema.json
on a DataFrame to get the schema, such as spark.range(10).schema.json()
.
I am having a similar problem when I try to use multi-threading to speed up the merge operations of different, independent delta tables with PySpark. When submitting multiple merge operations with parallel threads to Spark, the new columns are not added to the resulting delta tables. I am using the same SparkSession object across all threads (should be thread safe).
@sdaberdaku did you hit the same error? Do you have a quick reproduction we can try?
@zsxwing I have encountered a similar issue while using Delta with Java Spark. I managed to find a workaround by configuring the following properties in the spark-defaults
file:
"spark.databricks.delta.schema.autoMerge.enabled": "True",
"spark.databricks.delta.schema.autoMerge.enabledOnWrite": "True"
It's worth noting that I observed Spark being case-sensitive when specifying these properties. If I switch them back to lowercase or use "true" instead of "True," the problem resurfaces. It's important to maintain the correct case and use "True" to ensure the properties are recognized properly.
Furthermore, I should mention that this issue seems to be specific to AWS EMR servers, in my case emr-6.9.0
with delta 2.1.0
, as I haven't encountered it in other environments. Therefore, this workaround may be applicable specifically to AWS EMR setups.
I hope this explanation helps in debugging.
I am encountering this issue using scala on a standalone cluster with client deployment mode. Going to test to see if
"spark.databricks.delta.schema.autoMerge.enabled": "True"
instead of
"spark.databricks.delta.schema.autoMerge.enabled": "true"
fix the issue
I am having a similar problem when I try to use multi-threading to speed up the merge operations of different, independent delta tables with PySpark. When submitting multiple merge operations with parallel threads to Spark, the new columns are not added to the resulting delta tables. I am using the same SparkSession object across all threads (should be thread safe).
Having the same issue here. Did you solve the problem ? @sdaberdaku