delta icon indicating copy to clipboard operation
delta copied to clipboard

[Spark][1.0] Fix a data loss bug in MergeIntoCommand

Open sezruby opened this issue 1 year ago • 15 comments

Which Delta project/connector is this regarding?

  • [x] Spark
  • [ ] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Description

Fix a data loss bug in MergeIntoCommand. It's caused by using different spark session config object for PreprocessTableMerge and MergeIntoCommand. In PreprocessTableMerge, the config value is from the spark session which is given for DeltaTable. In MergeIntoSchema it refers SQLConf.get which is from the active session of the current thread. It can be different when user set another new spark session or active session just doesn't set properly before the execution.

If source dataframe has more columns than target table, auto schema merge feature adds additional nullable column to target table schema. The updated output projection built in PreprocessTableMerge, so matchedClauses and notMatchedClauses contains the addtional columns, but target table schema in MergeIntoCommand doesn't have it.

As a result, the following index doesn't indicate the delete flag column index, which is numFields - 2.

      def shouldDeleteRow(row: InternalRow): Boolean =
        row.getBoolean(outputRowEncoder.schema.fields.size)

row.getBoolean returns getByte() != 0, which causes dropping rows randomly.

  • matched rows in target table loss

Also as autoMerge doesn't work

  • newly added column data in source df loss.

The fix makes sure setting active session as the given spark session for target table. The PR applies the fix for other command to avoid any inconsistent config behavior.

Fixes #2104

How was this patch tested?

I confirmed that #2104 is fixed with the change. I confirmed the following by debug log message without the change:

  1. matchedClauses has more columns after processRow
  2. row.getBoolean(outputRowEncoder.schema.fields.size) refers random column value (It's Unsafe read)
  3. canMergeSchema in MergeIntoCommand is false, it was true in PreprocessTableMerge

Does this PR introduce any user-facing changes?

Yes, fixes the data loss issue

sezruby avatar Sep 30 '23 18:09 sezruby

Seems the fix is needed for all other versions.

in MergeIntoCommand

override val canMergeSchema: Boolean = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE)

conf is SQLConf.get from QueryPlan - LogicalPlan - Command - RunnableCommand

sezruby avatar Sep 30 '23 19:09 sezruby

@johanl-db Could you please take a look into this?

vkorukanti avatar Sep 30 '23 19:09 vkorukanti

Thanks @sezruby for contributing a fix. A few questions regarding the issue to help me understand better what's happening:

  1. Are you able to reproduce it on other Delta versions than 1.0?

  2. Is there a way to turn the repro you have into a test that can be added to this PR to validate the fix?

  3. Is the value of spark.databricks.delta.schema.autoMerge.enabled updated while the MERGE operation is running?

  4. Do you have a concrete example of what you mean by multiple spark sessions running concurrently":

It's caused by using different spark session config object for PreprocessTableMerge and MergeIntoCommand, which is possible when multiple spark sessions are running concurrently.

This pattern of accessing the spark conf is fairly widespread, I suspect a lot of things would break if there was an issue with it. You're seeing different values for DELTA_SCHEMA_AUTO_MIGRATE but can you check if PreprocessTableMerge and MergeIntoCommand are using two different conf objects or if it's the same object but the DELTA_SCHEMA_AUTO_MIGRATE value changed between the two?

johanl-db avatar Oct 02 '23 07:10 johanl-db

1 => We have logs and I can find the case in spark 3.2/1.2 too, using

| where (numTargetCopiedRows + numTargetUpdatedRows) > 0 
 and numTargetDeletedRows == 0 and numOutputRows != (numTargetCopiedRows + numTargetUpdatedRows + numTargetInsertedRows)

I think the issue exists higher version too, as it's the same code.

2, 4 => no repro test, but I guess the repro condition is

  • triggering MERGE using multiple spark sessions concurrently (5~10 count)
  • first session has the config set, others not.
  • setActiveSession called for other sessions before SQLConf.get for the first session.

3 => I need to confirm that each MERGE request uses the same spark session or different spark sessions(I guess it's different spark sessions). However, both case could be the problem. We should throw the exception if the config is changed within the same spark session, too.

The problem is, SQLConf.get refers cached conf, which is from getActiveSession() https://github.com/apache/spark/blob/cdbb301143de2e9a0ea525d20867948f49863842/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L117

But the active session can be changed before SQLConf is cached for the thread.

Yes I know many places use the conf from the active session. If the config affects correctness, we should avoid that.

sezruby avatar Oct 02 '23 17:10 sezruby

I confirmed they are using multi threads (python) to run MERGE command to tables concurrently, but not changing the config nor creating a new session for each thread. However, there could be still an issue from setting/resetting active session while running queries.

https://github.com/apache/spark/blob/4a418a448eab6e1007927db92ecacad6594397c8/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L1107

I suspect default session is set when one of threads is exiting withActive. https://github.com/apache/spark/blob/4a418a448eab6e1007927db92ecacad6594397c8/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L769

sezruby avatar Oct 02 '23 22:10 sezruby

Here's pyspark repro:

# repro.py
import tempfile
from multiprocessing.pool import ThreadPool

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("Test").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

with tempfile.TemporaryDirectory() as temp_dir:
    source_path = f"{temp_dir}/source"
    target_path = f"{temp_dir}/target"

    def f(spark):
        print(f"spark = {spark}")
        print(f"active session = {SparkSession.getActiveSession()}")
        print(f"autoMerge from SQLConf.get = {spark._jvm.org.apache.spark.sql.internal.SQLConf.get().getConfString('spark.databricks.delta.schema.autoMerge.enabled', '<not set>')}")
        spark.range(20) \
            .withColumn("x", col("id")) \
            .withColumn("y", col("id")) \
            .write.mode("overwrite").format("delta").save(source_path)
        spark.range(1) \
            .withColumn("x", col("id")) \
            .write.mode("overwrite").format("delta").save(target_path)
        # Idk why but DeltaTable.forPath(spark, target_path) fails
        target = DeltaTable(spark, spark._jvm.io.delta.tables.DeltaTable.forPath(spark._jsparkSession, target_path))
        source = spark.read.format("delta").load(source_path).alias("s")
        target.alias("t") \
            .merge(source, "t.id = s.id") \
            .whenMatchedUpdate(set = {"t.x": "t.x + 1"}) \
            .whenNotMatchedInsertAll() \
            .execute()
        spark.read.format("delta").load(target_path).show()
        assert spark.read.format("delta").load(target_path).count() == 20

    pool = ThreadPool(1)
    pool.starmap(f, [(spark,)])

Run as:

./bin/spark-submit --packages io.delta:delta-core_2.12:2.2.0 --conf spark.databricks.delta.schema.autoMerge.enabled=true repro.py

This is because a new thread is created in the JVM process and that thread doesn't inherit any ThreadLocal variables, including the active session ref.

I think a better fix (if we want to minimize changes) is wrapping DeltaMergeBuilder.execute() with targetTable.toDF.sparkSession.withActive but withActive is private. We can make a similar util in Delta.

A more fundamental fix would be to remove all references to the active session and ensure we only reference the same SparkSession throughout the operation.

Here's Scala repro:

withTempDir { dir =>
  val r = dir.getCanonicalPath
  val sourcePath = s"$r/source"
  val targetPath = s"$r/target"
  val spark2 = spark.newSession
  spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
  spark.range(20)
    .withColumn("x", $"id")
    .withColumn("y", $"id")
    .write.mode("overwrite").format("delta").save(sourcePath)
  spark.range(1)
    .withColumn("x", $"id")
    .write.mode("overwrite").format("delta").save(targetPath)
  val target = io.delta.tables.DeltaTable.forPath(spark, targetPath)
  val source = spark.read.format("delta").load(sourcePath).alias("s")
  val merge = target.alias("t")
    .merge(source, "t.id = s.id")
    .whenMatched.updateExpr(Map("t.x" -> "t.x + 1"))
    .whenNotMatched.insertAll()
  SparkSession.setActiveSession(spark2)
  merge.execute()
  assert(spark.read.format("delta").load(targetPath).count() == 20)
}

clee704 avatar Oct 03 '23 22:10 clee704

cc @scottsand-db for 3.0

vkorukanti avatar Oct 03 '23 22:10 vkorukanti

Rephrasing your findings @clee704, let me know if you disagree:

So the issue is essentially that in DeltaMergeBuilder.execute(), we use the spark session from the target table that gets propagated to the analysis step and is ultimately the one used here to get the schema evolution conf value. DeltaMergeBuilder.execute():

    val sparkSession = targetTable.toDF.sparkSession
    val resolvedMergeInto =
      DeltaMergeInto.resolveReferencesAndSchema(mergePlan, sparkSession.sessionState.conf)(
        tryResolveReferencesForExpressions(sparkSession))

but in MergeIntoCommand, we get the value from SQLConf which requires the conf to be set in the thread running that code.

I agree with wrapping execution with targetTable.toDF.sparkSession.withActive ( and replicating the existing spark util to make it available in DeltaMergeBuilder) . This is more robust compared to the proposed fix as there may be other situations outside of schema evolution where the same inconsistencies appear.

Other commands may also suffer from the same issue, we should review if UPDATE/INSERT, DELETE, or utility commands need the same fix

johanl-db avatar Oct 04 '23 09:10 johanl-db

@johanl-db Yes, that's a good summary. For other potential issues, we can make a follow up.

clee704 avatar Oct 04 '23 22:10 clee704

@johanl-db Any delivery plan from Databricks by any chance? I can revert changes for other command if we need to do it with following up PRs + additional tests. I think the issue exists for all versions, users are being affected silently.

sezruby avatar Oct 05 '23 02:10 sezruby

I created cherry-picks to all 2.x branches and master:

  • 3.0: https://github.com/delta-io/delta/pull/2156
  • 2.4: https://github.com/delta-io/delta/pull/2157
  • 2.3: https://github.com/delta-io/delta/pull/2158
  • 2.2: https://github.com/delta-io/delta/pull/2159
  • 2.1: https://github.com/delta-io/delta/pull/2160
  • 2.0: https://github.com/delta-io/delta/pull/2161
  • master: https://github.com/delta-io/delta/pull/2162

@sezruby do you mind quickly going over them to check that I didn't mess anything? @scottsand-db you can merge this PR against 1.0, then once the other cherry-picks are reviewed we can merge them also

johanl-db avatar Oct 10 '23 16:10 johanl-db

@sezruby I updated the changes against all branches except 1.0 to cover executeDetails, executeRestore, executeGenerate and optimize. Can you update this PR against 1.0 before we merge it?

johanl-db avatar Oct 12 '23 15:10 johanl-db

@sezruby I updated the changes against all branches except 1.0 to cover executeDetails, executeRestore, executeGenerate and optimize. Can you update this PR against 1.0 before we merge it?

I updated for executeGenerate. No other command available in 1.0

sezruby avatar Oct 12 '23 17:10 sezruby

This is because a new thread is created in the JVM process and that thread doesn't inherit any ThreadLocal variables, including the active session ref.

@johanl-db, this PR, #2154 is related to this issue?

felipepessoto avatar Oct 26 '23 23:10 felipepessoto

This is because a new thread is created in the JVM process and that thread doesn't inherit any ThreadLocal variables, including the active session ref.

@johanl-db, this PR, #2154 is related to this issue?

Same category of issues but these are actually unrelated:

  • https://github.com/delta-io/delta/pull/2154: whenever we start a new thread internally, we need to forward various thread-local properties which DeltaThreadPool wasn't doing. I believe @fred-db found out about this while working on https://github.com/delta-io/delta/pull/2203.
  • This PR: addresses a gap in Delta table APIs that let a user pass a spark session when creating an operation that is different than the (thread-local) active session used to later execute it. Follows from a bug report.

Short update from my side on this PR since it has been stalled for a while: it's still on my radar but things have been a bit slow due to the 3.0 release. The fix was merged in 3.0 before the release, I still need to cover other branches.

johanl-db avatar Oct 27 '23 08:10 johanl-db