delta
delta copied to clipboard
[Spark][1.0] Fix a data loss bug in MergeIntoCommand
Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Description
Fix a data loss bug in MergeIntoCommand. It's caused by using different spark session config object for PreprocessTableMerge and MergeIntoCommand. In PreprocessTableMerge, the config value is from the spark session which is given for DeltaTable. In MergeIntoSchema it refers SQLConf.get which is from the active session of the current thread. It can be different when user set another new spark session or active session just doesn't set properly before the execution.
If source dataframe has more columns than target table, auto schema merge feature adds additional nullable column to
target table schema. The updated output projection built in PreprocessTableMerge, so matchedClauses
and notMatchedClauses
contains the addtional columns, but target table schema in MergeIntoCommand doesn't have it.
As a result, the following index doesn't indicate the delete flag column index, which is numFields - 2
.
def shouldDeleteRow(row: InternalRow): Boolean =
row.getBoolean(outputRowEncoder.schema.fields.size)
row.getBoolean returns getByte() != 0
, which causes dropping rows randomly.
- matched rows in target table loss
Also as autoMerge doesn't work
- newly added column data in source df loss.
The fix makes sure setting active session as the given spark session for target table. The PR applies the fix for other command to avoid any inconsistent config behavior.
Fixes #2104
How was this patch tested?
I confirmed that #2104 is fixed with the change. I confirmed the following by debug log message without the change:
- matchedClauses has more columns after processRow
- row.getBoolean(outputRowEncoder.schema.fields.size) refers random column value (It's Unsafe read)
- canMergeSchema in MergeIntoCommand is false, it was true in PreprocessTableMerge
Does this PR introduce any user-facing changes?
Yes, fixes the data loss issue
Seems the fix is needed for all other versions.
in MergeIntoCommand
override val canMergeSchema: Boolean = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE)
conf
is SQLConf.get
from QueryPlan - LogicalPlan - Command - RunnableCommand
@johanl-db Could you please take a look into this?
Thanks @sezruby for contributing a fix. A few questions regarding the issue to help me understand better what's happening:
-
Are you able to reproduce it on other Delta versions than 1.0?
-
Is there a way to turn the repro you have into a test that can be added to this PR to validate the fix?
-
Is the value of
spark.databricks.delta.schema.autoMerge.enabled
updated while the MERGE operation is running? -
Do you have a concrete example of what you mean by multiple spark sessions running concurrently":
It's caused by using different spark session config object for PreprocessTableMerge and MergeIntoCommand, which is possible when multiple spark sessions are running concurrently.
This pattern of accessing the spark conf is fairly widespread, I suspect a lot of things would break if there was an issue with it. You're seeing different values for DELTA_SCHEMA_AUTO_MIGRATE
but can you check if PreprocessTableMerge
and MergeIntoCommand
are using two different conf
objects or if it's the same object but the DELTA_SCHEMA_AUTO_MIGRATE
value changed between the two?
1 => We have logs and I can find the case in spark 3.2/1.2 too, using
| where (numTargetCopiedRows + numTargetUpdatedRows) > 0
and numTargetDeletedRows == 0 and numOutputRows != (numTargetCopiedRows + numTargetUpdatedRows + numTargetInsertedRows)
I think the issue exists higher version too, as it's the same code.
2, 4 => no repro test, but I guess the repro condition is
- triggering MERGE using multiple spark sessions concurrently (5~10 count)
- first session has the config set, others not.
- setActiveSession called for other sessions before SQLConf.get for the first session.
3 => I need to confirm that each MERGE request uses the same spark session or different spark sessions(I guess it's different spark sessions). However, both case could be the problem. We should throw the exception if the config is changed within the same spark session, too.
The problem is, SQLConf.get refers cached conf, which is from getActiveSession() https://github.com/apache/spark/blob/cdbb301143de2e9a0ea525d20867948f49863842/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L117
But the active session can be changed before SQLConf is cached for the thread.
Yes I know many places use the conf from the active session. If the config affects correctness, we should avoid that.
I confirmed they are using multi threads (python) to run MERGE command to tables concurrently, but not changing the config nor creating a new session for each thread. However, there could be still an issue from setting/resetting active session while running queries.
https://github.com/apache/spark/blob/4a418a448eab6e1007927db92ecacad6594397c8/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L1107
I suspect default session is set when one of threads is exiting withActive. https://github.com/apache/spark/blob/4a418a448eab6e1007927db92ecacad6594397c8/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L769
Here's pyspark repro:
# repro.py
import tempfile
from multiprocessing.pool import ThreadPool
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import DeltaTable
spark = SparkSession.builder.appName("Test").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
with tempfile.TemporaryDirectory() as temp_dir:
source_path = f"{temp_dir}/source"
target_path = f"{temp_dir}/target"
def f(spark):
print(f"spark = {spark}")
print(f"active session = {SparkSession.getActiveSession()}")
print(f"autoMerge from SQLConf.get = {spark._jvm.org.apache.spark.sql.internal.SQLConf.get().getConfString('spark.databricks.delta.schema.autoMerge.enabled', '<not set>')}")
spark.range(20) \
.withColumn("x", col("id")) \
.withColumn("y", col("id")) \
.write.mode("overwrite").format("delta").save(source_path)
spark.range(1) \
.withColumn("x", col("id")) \
.write.mode("overwrite").format("delta").save(target_path)
# Idk why but DeltaTable.forPath(spark, target_path) fails
target = DeltaTable(spark, spark._jvm.io.delta.tables.DeltaTable.forPath(spark._jsparkSession, target_path))
source = spark.read.format("delta").load(source_path).alias("s")
target.alias("t") \
.merge(source, "t.id = s.id") \
.whenMatchedUpdate(set = {"t.x": "t.x + 1"}) \
.whenNotMatchedInsertAll() \
.execute()
spark.read.format("delta").load(target_path).show()
assert spark.read.format("delta").load(target_path).count() == 20
pool = ThreadPool(1)
pool.starmap(f, [(spark,)])
Run as:
./bin/spark-submit --packages io.delta:delta-core_2.12:2.2.0 --conf spark.databricks.delta.schema.autoMerge.enabled=true repro.py
This is because a new thread is created in the JVM process and that thread doesn't inherit any ThreadLocal variables, including the active session ref.
I think a better fix (if we want to minimize changes) is wrapping DeltaMergeBuilder.execute()
with targetTable.toDF.sparkSession.withActive
but withActive
is private. We can make a similar util in Delta.
A more fundamental fix would be to remove all references to the active session and ensure we only reference the same SparkSession throughout the operation.
Here's Scala repro:
withTempDir { dir =>
val r = dir.getCanonicalPath
val sourcePath = s"$r/source"
val targetPath = s"$r/target"
val spark2 = spark.newSession
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
spark.range(20)
.withColumn("x", $"id")
.withColumn("y", $"id")
.write.mode("overwrite").format("delta").save(sourcePath)
spark.range(1)
.withColumn("x", $"id")
.write.mode("overwrite").format("delta").save(targetPath)
val target = io.delta.tables.DeltaTable.forPath(spark, targetPath)
val source = spark.read.format("delta").load(sourcePath).alias("s")
val merge = target.alias("t")
.merge(source, "t.id = s.id")
.whenMatched.updateExpr(Map("t.x" -> "t.x + 1"))
.whenNotMatched.insertAll()
SparkSession.setActiveSession(spark2)
merge.execute()
assert(spark.read.format("delta").load(targetPath).count() == 20)
}
cc @scottsand-db for 3.0
Rephrasing your findings @clee704, let me know if you disagree:
So the issue is essentially that in DeltaMergeBuilder.execute(), we use the spark session from the target table that gets propagated to the analysis step and is ultimately the one used here to get the schema evolution conf value. DeltaMergeBuilder.execute()
:
val sparkSession = targetTable.toDF.sparkSession
val resolvedMergeInto =
DeltaMergeInto.resolveReferencesAndSchema(mergePlan, sparkSession.sessionState.conf)(
tryResolveReferencesForExpressions(sparkSession))
but in MergeIntoCommand, we get the value from SQLConf
which requires the conf to be set in the thread running that code.
I agree with wrapping execution with targetTable.toDF.sparkSession.withActive
( and replicating the existing spark util to make it available in DeltaMergeBuilder
) . This is more robust compared to the proposed fix as there may be other situations outside of schema evolution where the same inconsistencies appear.
Other commands may also suffer from the same issue, we should review if UPDATE/INSERT, DELETE, or utility commands need the same fix
@johanl-db Yes, that's a good summary. For other potential issues, we can make a follow up.
@johanl-db Any delivery plan from Databricks by any chance? I can revert changes for other command if we need to do it with following up PRs + additional tests. I think the issue exists for all versions, users are being affected silently.
I created cherry-picks to all 2.x branches and master:
- 3.0: https://github.com/delta-io/delta/pull/2156
- 2.4: https://github.com/delta-io/delta/pull/2157
- 2.3: https://github.com/delta-io/delta/pull/2158
- 2.2: https://github.com/delta-io/delta/pull/2159
- 2.1: https://github.com/delta-io/delta/pull/2160
- 2.0: https://github.com/delta-io/delta/pull/2161
- master: https://github.com/delta-io/delta/pull/2162
@sezruby do you mind quickly going over them to check that I didn't mess anything? @scottsand-db you can merge this PR against 1.0, then once the other cherry-picks are reviewed we can merge them also
@sezruby I updated the changes against all branches except 1.0 to cover executeDetails, executeRestore, executeGenerate and optimize. Can you update this PR against 1.0 before we merge it?
@sezruby I updated the changes against all branches except 1.0 to cover executeDetails, executeRestore, executeGenerate and optimize. Can you update this PR against 1.0 before we merge it?
I updated for executeGenerate. No other command available in 1.0
This is because a new thread is created in the JVM process and that thread doesn't inherit any ThreadLocal variables, including the active session ref.
@johanl-db, this PR, #2154 is related to this issue?
This is because a new thread is created in the JVM process and that thread doesn't inherit any ThreadLocal variables, including the active session ref.
@johanl-db, this PR, #2154 is related to this issue?
Same category of issues but these are actually unrelated:
- https://github.com/delta-io/delta/pull/2154: whenever we start a new thread internally, we need to forward various thread-local properties which
DeltaThreadPool
wasn't doing. I believe @fred-db found out about this while working on https://github.com/delta-io/delta/pull/2203. - This PR: addresses a gap in Delta table APIs that let a user pass a spark session when creating an operation that is different than the (thread-local) active session used to later execute it. Follows from a bug report.
Short update from my side on this PR since it has been stalled for a while: it's still on my radar but things have been a bit slow due to the 3.0 release. The fix was merged in 3.0 before the release, I still need to cover other branches.