Spark: MERGE INTO Statements with only WHEN NOT MATCHED Clauses are always executed at Snapshot Isolation
Apache Iceberg version
1.8.1 (latest release)
Query engine
Spark
Please describe the bug 🐞
When running two concurrent MERGE INTO operations on an Apache Iceberg table, I expect them to be idempotent -- meaning Iceberg should either detect conflicts and resolve them or fail one of the jobs to prevent data inconsistencies.
However, Iceberg determines the operation type dynamically based on the result of the join condition, which can lead to unexpected behavior:
- If a match is found, Iceberg treats it as an overwrite operation and fails the second job due to conflicting commits.
- If no match is found, Iceberg considers it an append operation and attempts to resolve conflicts by creating a new manifest for appended data, as explained in the Cost of Retries doc.
This behavior introduces a problem:
If the dataset is large enough and neither job finds a match, both will proceed with appending data independently, causing duplicate records.
Reproduction Steps
Running the following query in concurrent jobs can result in duplicate data if no matching records exist in dest:
MERGE INTO dest
USING src
ON dest.id = src.id
WHEN NOT MATCHED THEN
INSERT *
-- even with update action, we'll have the same issue
-- WHEN MATCHED THEN
-- UPDATE SET *
I initially expected the operation type to be determined by the query itself (i.e., always "append" in the query without UPDATE action). However, through testing, I found that Iceberg decides the operation type at runtime, based on the actual join results. This makes MERGE INTO non-idempotent, leading to unintended duplicate inserts.
Expected Behavior
Iceberg should ensure idempotency for MERGE INTO, preventing duplicate data when no matches are found.
Additional Context
- Iceberg version: 1.8.1
- Iceberg catalog: Glue catalog (type
glue) with S3 FileIO - Spark version: 3.5.5
Would love to hear if others have encountered this or if there's a recommended workaround.
Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [x] I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
What Isolation level are you using? Duplication (double inserts) would be expected if the isolation level was snapshot, but forbidden if the isolation level was serializable? I'll check the operation setup as well
write.merge.isolation-level, write.update.isolation-level, and write.delete.isolation-level properties are set to serializable.
but forbidden if the isolation level was serializable?
The problem is how Iceberg handles the merge operation if it only contains new data (append), and how Iceberg resolves the append operation conflicts.
I will let you check the operation setup, thanks!
I am a little confused about the "operation" setup because the validation rules used by the Row Delta operations are unrelated to the Append paths as far as I know. Although something may have changed. See CopyOnWriteOperation for example
Ok I think I see the issue, it's inside the Spark Code. Let me finish testing this
The issue I believe is related to the optimizations here
https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L47-L75
The code above takes a MergeIntoCommand which only does "NOT MATCHED INSERT" and changes the Command into an AppendData.byPosition instead of a RowDelta like
https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L129-L138
AppendData is treated in Apache Iceberg as a Batch Write which always operates under what is equivalent to "Snapshot Isolation" regardless of what has been configured for RowDeltas. This means MERGE INTO NOT MATCHED INSERT commands with no match clause are running at the wrong isolation level leading to the above issue.
Repo - TestMerge.java
@TestTemplate
public void testMergeIntoEmptyTableSerializableIsolationNotMatchedOnly() throws InterruptedException {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
createOrReplaceView(
"source",
"id INT, dep STRING",
"{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
String query = "MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED THEN INSERT *";
executorService.submit(() ->
spark.sql(
String.format(query, tableName)).collect());
executorService.submit(() ->
spark.sql(
String.format(query, tableName)).collect());
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(1, "emp-id-1"), // new
row(2, "emp-id-2"), // new
row(3, "emp-id-3") // new
);
assertEquals(
"Should have expected rows",
expectedRows,
sql("SELECT * FROM %s ORDER BY id", tableName));
}
@hussein-awala not sure about your tests but I find that everything works as expected if the query has a no-op MATCH clause
@TestTemplate
public void testMergeIntoEmptyTableSerializableIsolation() throws InterruptedException {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
createOrReplaceView(
"source",
"id INT, dep STRING",
"{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
String query = "MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED THEN UPDATE SET dep = 'foo' WHEN NOT MATCHED THEN INSERT *";
executorService.submit(() ->
spark.sql(
String.format(query, tableName)).collect());
executorService.submit(() ->
spark.sql(
String.format(query, tableName)).collect());
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(1, "emp-id-1"), // new
row(2, "emp-id-2"), // new
row(3, "emp-id-3") // new
);
assertEquals(
"Should have expected rows",
expectedRows,
sql("SELECT * FROM %s ORDER BY id", tableName));
}
Thanks @RussellSpitzer for this investigation. All my tests only had a no-match clause. For the query with a match clause, I just checked the operation in the commit metrics, and I assumed I will have the same issue. I will test it tomorrow and see if the issue will be resolved by adding a match condition.
@RussellSpitzer indeed, it detect a conflict when I add a WHEN MATCHED clause, and I used WHEN MATCHED AND 1!=1 THEN UPDATE SET * to avoid updating the data, although it always rewrites the data file (the same data without any updates).
For this issue, do you think we should add a note in the documentation and/or add your tests to detect a change in behavior?
This is a serious bug and it needs to be fixed. @aokolnychyi and I were discussing it yesterday and trying to figure out what the right place to fix this is.
@aokolnychyi Had some preliminary ideas on how to fix this, it will require a Spark change though.
@RussellSpitzer @aokolnychyi following up on this issue, do you know by any chance if it was fixed in Iceberg latest version or is it still a bug as of today? Should we still use the no-op MATCH workaround?
@szehon-ho Promised me he would do this :P
😄 , yea it sounds like it will be the Spark side. If so, I think @aokolnychyi did mention to me. He is off and ill be off as well after, so I may not get to chat with him until September, but can update if i get some idea before that
@RussellSpitzer Could you update the progress of this issue? Should I need to take any workarounds?