delta
delta copied to clipboard
Fix ConflictChecker for Read after Optimize
Description
ConcurrentAppendException is thrown when Optimize - Read. This PR allows concurrent Read after Optimize commit.
How was this patch tested?
Unit tests
Does this PR introduce any user-facing changes?
No, a bug fix when Optimize transaction is committed while concurrent Reads.
Can you elaborate on the issue? Which operation throws ConcurrentAppendException? Optimize or Append? And can give an example error and stacktrace? Also, can you explain the semantics of why this change is correct and maintain serializability?
@tdas Append operation throws ConcurrentAppendException.
io.delta.exceptions.ConcurrentAppendException: Files were added to partition [colC=1] by a concurrent update. Please try the operation again.
Conflicting commit: {"timestamp":1662599111087,"operation":"OPTIMIZE", ...
https://github.com/delta-io/delta/blob/2041c3b7138b9614529540613195d4afd39fde71/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L177-L188 This change makes optimized files are not conflicting with read operation for Append operation. Actually it's not conflicting with APPEND but with the scan.
This is the repro:
// session1 - performing OPTIMIZE
(1 to 10).foreach { i =>
println(spark.sql(s"OPTIMIZE delta.`$dataPath`").collect.toSeq)
}
// session 2 - performing APPEND
(1 to 10).foreach { i =>
spark.read.format("delta").load(dataPath).limit(10).write.mode("append").format("delta").partitionBy("colC").save(dataPath)
}
@sezruby Thank you for the explanation, and the repro. So its a problem not with blind appends but read-then-append.
I have think about it, whether this change has other unintended consequence in other kind of workloads. Actually can you provide a logical argument why this change will not produce unintended consequence in other combination of operations .. like DELETE/UPDATE/MERGE + OPTIMIZE? I know that you have put some tests with delete, but tests can have coverage gaps. So it would good to have a logical convincing argument that this change is safe no matter what operation
@tdas I found that there's still can be a conflict if concurrent transaction reads a RemoveFile from Optimize.
io.delta.exceptions.ConcurrentDeleteReadException: This transaction attempted to read one or more files that were deleted (for example colC=0/part-00000-cf1e16ab-27b2-4c9c-b5e1-bccfb0e79a58.c000.snappy.parquet in partition [colC=0]) by a concurrent update. Please try the operation again. Conflicting commit: {"timestamp":1663052151972,"operation":"OPTIMIZE"
https://github.com/delta-io/delta/blob/d2785aa93e8f2f5c70a3351ddc4995e2c943897b/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L226-L235
I think we can add changedDataRemovedFiles like changedDataAddedFiles and use it for the check function.
We could allow Read to the previous version before OPTIMIZE without any serialization issue if the transaction doesn't update the existing data. With the fix, we could support concurrent OPTIMZIE while performing append/insert only operations which utilizes existing data for appending new rows.
For other type of operation which may have RemoveFile (DELETE UPDATE MERGE OPTIMIZE), it will fail with the following check:
https://github.com/delta-io/delta/blob/d2785aa93e8f2f5c70a3351ddc4995e2c943897b/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L250-L266
Wouldn't this PR https://github.com/delta-io/delta/pull/1262 help with this?
@scottsand-db Seems the PR allows to switch isolation level to WriteSerializable. Not sure any other changes will be delivered, but with the current code, the issue still exists:
https://github.com/delta-io/delta/blob/d2785aa93e8f2f5c70a3351ddc4995e2c943897b/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L181-L188
it still checks changedDataAddedFiles with WriteSerializable.
Also with this change, we could allow concurrent read-append & optimize for Serializable level.
@tdas Ready-For-Review
Seems the PR fixes https://github.com/delta-io/delta/issues/326
@tdas I've taken a first pass. Can you also please take a look?
@sezruby Any news on this PR? We're running into this issue a lot, having a streaming job that writes to the same partition 24/7, and so we are having a hard time running OPTIMIZE on that partition.
Also, did you see https://github.com/delta-io/delta/pull/626 ?
@sezruby Any news on this PR? We're running into this issue a lot, having a streaming job that writes to the same partition 24/7, and so we are having a hard time running OPTIMIZE on that partition.
Also, did you see #626 ?
Hi, you can cherry-pick this PR to support OPTIMIZE - APPEND. This PR contains 2 parts
- changedAddedFiles in newly added file check
- with this, APPEND operation would succeed, as newly added files by OPTIMIZE are "dataChange=false"
- removedFiles check with readPredicates
- master version only checks if there's newly removed files among only "readFiles" of the transaction
- so, if there's newly deleted file after OPTIMIZE, it cannot be detected when APPEND https://github.com/delta-io/delta/pull/1305#discussion_r986304404
- To prevent this, I added the additional check using readPredicates, to see if there's other "newly removed files" matching read predicates.
- This may cause unnecessary conflicts between delete - append, I think that is very rare.
- t0 txn0 APPEND: read file2 using data filter
- t1 txn1 APPEND: add file1 - commit
- t2 txn2 DELETE: file1 - commit
- t3 txn0 APEND: commit => no conflict with master version, but with this PR, removed file1 can cause conflict
Thanks @sezruby !
Yeah I figured I can do that, but was hoping I could simplify my build by using an existing version artifact.
rolls up sleeves Oh well...