delta
delta copied to clipboard
[BUG] set userMetadata and get lastCommitVersionInSession not work with cloned session
Bug
Describe the problem
Trying to get lastCommitVersionInSession in a thread safe way. So cloned a new session in thread.
But it only works in databricks, not with open source delta 1.2.1
Steps to reproduce
Test code is copied from workaround in issue https://github.com/delta-io/delta/issues/857
session_clone = spark.newSession()
session_clone.conf.set("spark.databricks.delta.commitInfo.userMetadata", "metadata message")
val deltaTable = DeltaTable.forPath(session_clone, somePath)
deltaTable.merge(...).update(...).insert(...).execute()
commit_version = session_clone.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Observed results
Run it in databricks, see the userMetadata in history table, and get lastCommitVersionInSession from session_clone
Run it with open source delta 1.2.1, no userMetadata in history table, get null from session_clone, and get lastCommitVersionInSession from default spark session.
Expected results
userMetadata and lastCommitVersionInSession should works within session_clone
Further details
Environment information
- Delta Lake version: 1.2.1
- Spark version: 3.2.1
- Scala version: 2.12
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [x] No. I cannot contribute a bug fix at this time.
Hi @bugsbunny1101 - could you try this using Delta Lake 2.0.0? I did that using Pyspark and I was able to get all of the information just fine.
>>> from delta.tables import DeltaTable
>>> from pyspark.sql.functions import lit, expr
>>> path = "/tmp/scott_sandre_2022-08_25_001"
>>> session_clone = spark.newSession()
>>> session_clone.conf.set("spark.databricks.delta.commitInfo.userMetadata", "metadata message")
>>> spark.range(100).write.format("delta").mode("append").save(path)
>>> deltaTable = DeltaTable.forPath(session_clone, path)
>>> deltaTable.update(
... condition=expr("id == 0"),
... set={"id": expr("id + 100")})
>>> session_clone.conf.get("spark.databricks.delta.lastCommitVersionInSession")
'1'
>>> deltaTable.history()
// showed metadata message in the output
Closing this as we cannot produce the issue.