iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Spark: MERGE INTO Statements with only WHEN NOT MATCHED Clauses are always executed at Snapshot Isolation

Open hussein-awala opened this issue 9 months ago • 14 comments

Apache Iceberg version

1.8.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

When running two concurrent MERGE INTO operations on an Apache Iceberg table, I expect them to be idempotent -- meaning Iceberg should either detect conflicts and resolve them or fail one of the jobs to prevent data inconsistencies.

However, Iceberg determines the operation type dynamically based on the result of the join condition, which can lead to unexpected behavior:

  • If a match is found, Iceberg treats it as an overwrite operation and fails the second job due to conflicting commits.
  • If no match is found, Iceberg considers it an append operation and attempts to resolve conflicts by creating a new manifest for appended data, as explained in the Cost of Retries doc.

This behavior introduces a problem:
If the dataset is large enough and neither job finds a match, both will proceed with appending data independently, causing duplicate records.

Reproduction Steps

Running the following query in concurrent jobs can result in duplicate data if no matching records exist in dest:

MERGE INTO dest  
USING src  
ON dest.id = src.id  
WHEN NOT MATCHED THEN  
  INSERT *
-- even with update action, we'll have the same issue
-- WHEN MATCHED THEN
--  UPDATE SET *

I initially expected the operation type to be determined by the query itself (i.e., always "append" in the query without UPDATE action). However, through testing, I found that Iceberg decides the operation type at runtime, based on the actual join results. This makes MERGE INTO non-idempotent, leading to unintended duplicate inserts.

Expected Behavior

Iceberg should ensure idempotency for MERGE INTO, preventing duplicate data when no matches are found.

Additional Context

  • Iceberg version: 1.8.1
  • Iceberg catalog: Glue catalog (type glue) with S3 FileIO
  • Spark version: 3.5.5

Would love to hear if others have encountered this or if there's a recommended workaround.

Willingness to contribute

  • [ ] I can contribute a fix for this bug independently
  • [x] I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • [ ] I cannot contribute a fix for this bug at this time

hussein-awala avatar Mar 26 '25 12:03 hussein-awala

What Isolation level are you using? Duplication (double inserts) would be expected if the isolation level was snapshot, but forbidden if the isolation level was serializable? I'll check the operation setup as well

RussellSpitzer avatar Mar 26 '25 15:03 RussellSpitzer

write.merge.isolation-level, write.update.isolation-level, and write.delete.isolation-level properties are set to serializable.

but forbidden if the isolation level was serializable?

The problem is how Iceberg handles the merge operation if it only contains new data (append), and how Iceberg resolves the append operation conflicts.

I will let you check the operation setup, thanks!

hussein-awala avatar Mar 26 '25 16:03 hussein-awala

I am a little confused about the "operation" setup because the validation rules used by the Row Delta operations are unrelated to the Append paths as far as I know. Although something may have changed. See CopyOnWriteOperation for example

RussellSpitzer avatar Mar 26 '25 16:03 RussellSpitzer

Ok I think I see the issue, it's inside the Spark Code. Let me finish testing this

RussellSpitzer avatar Mar 26 '25 16:03 RussellSpitzer

The issue I believe is related to the optimizations here

https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L47-L75

The code above takes a MergeIntoCommand which only does "NOT MATCHED INSERT" and changes the Command into an AppendData.byPosition instead of a RowDelta like

https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L129-L138

AppendData is treated in Apache Iceberg as a Batch Write which always operates under what is equivalent to "Snapshot Isolation" regardless of what has been configured for RowDeltas. This means MERGE INTO NOT MATCHED INSERT commands with no match clause are running at the wrong isolation level leading to the above issue.

Repo - TestMerge.java

  @TestTemplate
  public void testMergeIntoEmptyTableSerializableIsolationNotMatchedOnly() throws InterruptedException {
    createAndInitTable("id INT, dep STRING");
    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");

    createOrReplaceView(
      "source",
      "id INT, dep STRING",
      "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
        + "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
        + "{ \"id\": 3, \"dep\": \"emp-id-3\" }");

    ExecutorService executorService =
      MoreExecutors.getExitingExecutorService(
        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));

    String query = "MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED THEN INSERT *";

    executorService.submit(() ->
      spark.sql(
        String.format(query, tableName)).collect());
    executorService.submit(() ->
      spark.sql(
        String.format(query, tableName)).collect());

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    ImmutableList<Object[]> expectedRows =
      ImmutableList.of(
        row(1, "emp-id-1"), // new
        row(2, "emp-id-2"), // new
        row(3, "emp-id-3")  // new
      );

    assertEquals(
      "Should have expected rows",
      expectedRows,
      sql("SELECT * FROM %s ORDER BY id", tableName));
  }

RussellSpitzer avatar Mar 26 '25 18:03 RussellSpitzer

@hussein-awala not sure about your tests but I find that everything works as expected if the query has a no-op MATCH clause

 @TestTemplate
  public void testMergeIntoEmptyTableSerializableIsolation() throws InterruptedException {
    createAndInitTable("id INT, dep STRING");
    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");

    createOrReplaceView(
      "source",
      "id INT, dep STRING",
      "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
        + "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
        + "{ \"id\": 3, \"dep\": \"emp-id-3\" }");

    ExecutorService executorService =
      MoreExecutors.getExitingExecutorService(
        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));

    String query = "MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED THEN UPDATE SET  dep = 'foo' WHEN NOT MATCHED THEN INSERT *";

    executorService.submit(() ->
      spark.sql(
        String.format(query, tableName)).collect());
    executorService.submit(() ->
      spark.sql(
        String.format(query, tableName)).collect());

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    ImmutableList<Object[]> expectedRows =
      ImmutableList.of(
        row(1, "emp-id-1"), // new
        row(2, "emp-id-2"), // new
        row(3, "emp-id-3")  // new
      );

    assertEquals(
      "Should have expected rows",
      expectedRows,
      sql("SELECT * FROM %s ORDER BY id", tableName));
  }

RussellSpitzer avatar Mar 26 '25 18:03 RussellSpitzer

Thanks @RussellSpitzer for this investigation. All my tests only had a no-match clause. For the query with a match clause, I just checked the operation in the commit metrics, and I assumed I will have the same issue. I will test it tomorrow and see if the issue will be resolved by adding a match condition.

hussein-awala avatar Mar 26 '25 20:03 hussein-awala

@RussellSpitzer indeed, it detect a conflict when I add a WHEN MATCHED clause, and I used WHEN MATCHED AND 1!=1 THEN UPDATE SET * to avoid updating the data, although it always rewrites the data file (the same data without any updates).

For this issue, do you think we should add a note in the documentation and/or add your tests to detect a change in behavior?

hussein-awala avatar Mar 27 '25 12:03 hussein-awala

This is a serious bug and it needs to be fixed. @aokolnychyi and I were discussing it yesterday and trying to figure out what the right place to fix this is.

RussellSpitzer avatar Mar 27 '25 13:03 RussellSpitzer

@aokolnychyi Had some preliminary ideas on how to fix this, it will require a Spark change though.

RussellSpitzer avatar Apr 02 '25 15:04 RussellSpitzer

@RussellSpitzer @aokolnychyi following up on this issue, do you know by any chance if it was fixed in Iceberg latest version or is it still a bug as of today? Should we still use the no-op MATCH workaround?

maximethebault avatar Jul 23 '25 19:07 maximethebault

@szehon-ho Promised me he would do this :P

sfc-gh-rspitzer avatar Jul 23 '25 19:07 sfc-gh-rspitzer

😄 , yea it sounds like it will be the Spark side. If so, I think @aokolnychyi did mention to me. He is off and ill be off as well after, so I may not get to chat with him until September, but can update if i get some idea before that

szehon-ho avatar Jul 28 '25 19:07 szehon-ho

@RussellSpitzer Could you update the progress of this issue? Should I need to take any workarounds?

wlf061 avatar Dec 12 '25 09:12 wlf061