delta icon indicating copy to clipboard operation
delta copied to clipboard

Fix ConflictChecker for Read after Optimize

Open sezruby opened this issue 3 years ago • 4 comments

Description

ConcurrentAppendException is thrown when Optimize - Read. This PR allows concurrent Read after Optimize commit.

How was this patch tested?

Unit tests

Does this PR introduce any user-facing changes?

No, a bug fix when Optimize transaction is committed while concurrent Reads.

sezruby avatar Aug 01 '22 07:08 sezruby

Can you elaborate on the issue? Which operation throws ConcurrentAppendException? Optimize or Append? And can give an example error and stacktrace? Also, can you explain the semantics of why this change is correct and maintain serializability?

tdas avatar Aug 09 '22 22:08 tdas

@tdas Append operation throws ConcurrentAppendException.

io.delta.exceptions.ConcurrentAppendException: Files were added to partition [colC=1] by a concurrent update. Please try the operation again.
Conflicting commit: {"timestamp":1662599111087,"operation":"OPTIMIZE", ...

https://github.com/delta-io/delta/blob/2041c3b7138b9614529540613195d4afd39fde71/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L177-L188 This change makes optimized files are not conflicting with read operation for Append operation. Actually it's not conflicting with APPEND but with the scan.

This is the repro:

// session1 - performing OPTIMIZE
(1 to 10).foreach { i => 
  println(spark.sql(s"OPTIMIZE delta.`$dataPath`").collect.toSeq)
}
// session 2 - performing APPEND
(1 to 10).foreach { i => 

spark.read.format("delta").load(dataPath).limit(10).write.mode("append").format("delta").partitionBy("colC").save(dataPath)
}

sezruby avatar Sep 08 '22 03:09 sezruby

@sezruby Thank you for the explanation, and the repro. So its a problem not with blind appends but read-then-append.

I have think about it, whether this change has other unintended consequence in other kind of workloads. Actually can you provide a logical argument why this change will not produce unintended consequence in other combination of operations .. like DELETE/UPDATE/MERGE + OPTIMIZE? I know that you have put some tests with delete, but tests can have coverage gaps. So it would good to have a logical convincing argument that this change is safe no matter what operation

tdas avatar Sep 12 '22 17:09 tdas

@tdas I found that there's still can be a conflict if concurrent transaction reads a RemoveFile from Optimize.

io.delta.exceptions.ConcurrentDeleteReadException: This transaction attempted to read one or more files that were deleted (for example colC=0/part-00000-cf1e16ab-27b2-4c9c-b5e1-bccfb0e79a58.c000.snappy.parquet in partition [colC=0]) by a concurrent update. Please try the operation again. Conflicting commit: {"timestamp":1663052151972,"operation":"OPTIMIZE"

https://github.com/delta-io/delta/blob/d2785aa93e8f2f5c70a3351ddc4995e2c943897b/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L226-L235

I think we can add changedDataRemovedFiles like changedDataAddedFiles and use it for the check function. We could allow Read to the previous version before OPTIMIZE without any serialization issue if the transaction doesn't update the existing data. With the fix, we could support concurrent OPTIMZIE while performing append/insert only operations which utilizes existing data for appending new rows.

For other type of operation which may have RemoveFile (DELETE UPDATE MERGE OPTIMIZE), it will fail with the following check:

https://github.com/delta-io/delta/blob/d2785aa93e8f2f5c70a3351ddc4995e2c943897b/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L250-L266

sezruby avatar Sep 13 '22 07:09 sezruby

Wouldn't this PR https://github.com/delta-io/delta/pull/1262 help with this?

scottsand-db avatar Sep 15 '22 19:09 scottsand-db

@scottsand-db Seems the PR allows to switch isolation level to WriteSerializable. Not sure any other changes will be delivered, but with the current code, the issue still exists:

https://github.com/delta-io/delta/blob/d2785aa93e8f2f5c70a3351ddc4995e2c943897b/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L181-L188

it still checks changedDataAddedFiles with WriteSerializable.

Also with this change, we could allow concurrent read-append & optimize for Serializable level.

sezruby avatar Sep 15 '22 21:09 sezruby

@tdas Ready-For-Review

sezruby avatar Sep 21 '22 04:09 sezruby

Seems the PR fixes https://github.com/delta-io/delta/issues/326

sezruby avatar Sep 30 '22 17:09 sezruby

@tdas I've taken a first pass. Can you also please take a look?

scottsand-db avatar Sep 30 '22 17:09 scottsand-db

@sezruby Any news on this PR? We're running into this issue a lot, having a streaming job that writes to the same partition 24/7, and so we are having a hard time running OPTIMIZE on that partition.

Also, did you see https://github.com/delta-io/delta/pull/626 ?

zmeir avatar Aug 14 '23 11:08 zmeir

@sezruby Any news on this PR? We're running into this issue a lot, having a streaming job that writes to the same partition 24/7, and so we are having a hard time running OPTIMIZE on that partition.

Also, did you see #626 ?

Hi, you can cherry-pick this PR to support OPTIMIZE - APPEND. This PR contains 2 parts

  • changedAddedFiles in newly added file check
    • with this, APPEND operation would succeed, as newly added files by OPTIMIZE are "dataChange=false"
  • removedFiles check with readPredicates
    • master version only checks if there's newly removed files among only "readFiles" of the transaction
    • so, if there's newly deleted file after OPTIMIZE, it cannot be detected when APPEND https://github.com/delta-io/delta/pull/1305#discussion_r986304404
    • To prevent this, I added the additional check using readPredicates, to see if there's other "newly removed files" matching read predicates.
    • This may cause unnecessary conflicts between delete - append, I think that is very rare.
      • t0 txn0 APPEND: read file2 using data filter
      • t1 txn1 APPEND: add file1 - commit
      • t2 txn2 DELETE: file1 - commit
      • t3 txn0 APEND: commit => no conflict with master version, but with this PR, removed file1 can cause conflict

sezruby avatar Aug 14 '23 18:08 sezruby

Thanks @sezruby !

Yeah I figured I can do that, but was hoping I could simplify my build by using an existing version artifact.

rolls up sleeves Oh well...

zmeir avatar Aug 14 '23 20:08 zmeir