delta
delta copied to clipboard
[Spark] Fix metadata cleanup by retaining files required for log reconstruction
Which Delta project/connector is this regarding?
- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Description
Delete eligible delta log files only if there's a checkpoint newer than them before the cutoff window.
Resolves #606
Unit tests based on https://github.com/delta-io/delta/pull/2673
How was this patch tested?
Unit Tests
Does this PR introduce any user-facing changes?
Yes, tables with low rate of commit/checkpoints will have increased log retention beyond the cutoff window
@andreaschat-db, could you help with the Metadata cleanup respects requireCheckpointProtectionBeforeVersion test failure, please?
I couldn't find any info in Delta protocol about checkpointProtection table feature
[info] - Metadata cleanup respects requireCheckpointProtectionBeforeVersion *** FAILED ***
[info] HashSet(10, 14, 6, 9, 13, 12, 7, 11, 8, 15) did not equal HashSet(10, 14, 9, 13, 12, 11, 8, 15) (CheckpointProtectionTestUtilsMixin.scala:91)
[info] Analysis:
[info] HashSet(missingInRight: [6, 7])
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info] at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.$anonfun$testRequireCheckpointProtectionBeforeVersion$1(CheckpointProtectionTestUtilsMixin.scala:91)
[info] at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.$anonfun$testRequireCheckpointProtectionBeforeVersion$1$adapted(CheckpointProtectionTestUtilsMixin.scala:38)
[info] at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir(DeltaSQLTestUtils.scala:52)
[info] at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir$(DeltaSQLTestUtils.scala:50)
[info] at org.apache.spark.sql.delta.DeltaRetentionSuite.withTempDir(DeltaRetentionSuite.scala:38)
[info] at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir(DeltaSQLTestUtils.scala:35)
[info] at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir$(DeltaSQLTestUtils.scala:34)
[info] at org.apache.spark.sql.delta.DeltaRetentionSuite.withTempDir(DeltaRetentionSuite.scala:38)
[info] at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.testRequireCheckpointProtectionBeforeVersion(CheckpointProtectionTestUtilsMixin.scala:38)
[info] at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.testRequireCheckpointProtectionBeforeVersion$(CheckpointProtectionTestUtilsMixin.scala:30)
[info] at org.apache.spark.sql.delta.DeltaRetentionSuite.testRequireCheckpointProtectionBeforeVersion(DeltaRetentionSuite.scala:38)
[info] at org.apache.spark.sql.delta.DeltaRetentionSuite.$anonfun$new$94(DeltaRetentionSuite.scala:819)
@andreaschat-db, could you help with the
Metadata cleanup respects requireCheckpointProtectionBeforeVersiontest failure, please?I couldn't find any info in Delta protocol about checkpointProtection table feature
[info] - Metadata cleanup respects requireCheckpointProtectionBeforeVersion *** FAILED *** [info] HashSet(10, 14, 6, 9, 13, 12, 7, 11, 8, 15) did not equal HashSet(10, 14, 9, 13, 12, 11, 8, 15) (CheckpointProtectionTestUtilsMixin.scala:91) [info] Analysis: [info] HashSet(missingInRight: [6, 7]) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.$anonfun$testRequireCheckpointProtectionBeforeVersion$1(CheckpointProtectionTestUtilsMixin.scala:91) [info] at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.$anonfun$testRequireCheckpointProtectionBeforeVersion$1$adapted(CheckpointProtectionTestUtilsMixin.scala:38) [info] at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir(DeltaSQLTestUtils.scala:52) [info] at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir$(DeltaSQLTestUtils.scala:50) [info] at org.apache.spark.sql.delta.DeltaRetentionSuite.withTempDir(DeltaRetentionSuite.scala:38) [info] at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir(DeltaSQLTestUtils.scala:35) [info] at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir$(DeltaSQLTestUtils.scala:34) [info] at org.apache.spark.sql.delta.DeltaRetentionSuite.withTempDir(DeltaRetentionSuite.scala:38) [info] at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.testRequireCheckpointProtectionBeforeVersion(CheckpointProtectionTestUtilsMixin.scala:38) [info] at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.testRequireCheckpointProtectionBeforeVersion$(CheckpointProtectionTestUtilsMixin.scala:30) [info] at org.apache.spark.sql.delta.DeltaRetentionSuite.testRequireCheckpointProtectionBeforeVersion(DeltaRetentionSuite.scala:38) [info] at org.apache.spark.sql.delta.DeltaRetentionSuite.$anonfun$new$94(DeltaRetentionSuite.scala:819)
Hi @felipepessoto, please check the doc in TableFeature. AFAIU, your PR is changing which checkpoints are removed and this changes the expectations of the test.
@andreaschat-db, I fixed it, could you please review it?
Hi @felipepessoto, thank you for looking at this. I left a couple of comments. However, I am a bit confused about what is exactly the issue we are trying to fix. AFAIU, the
BufferingLogDeletionIteratoris called with a max version. Isn't it guaranteed that at maxVersion + 1 there is a checkpoint? If so, what is the scenario that this can go wrong? Is there a case where we may stop earlier than maxVersion?
@andreaschat-db currently we remove the expired files as long they meet the maxVersion and maxTimestamp requirements. The problem is if we have the following:
- Expired: 0.json-5.json
- Non-expired: 6.json-10.json + 10.checkpoint
The cleanup deletes 0-5, and the versions 6-9 are not expired, but can't be queried because they depend on 0-5 json files to build the Snapshot. This is a known issue, the protocol has a suggestion to always keep the last expired checkpoint: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#metadata-cleanup. It was my initial implementation, but then I found it causes issues with the checkpointProtection, because it keeps logs files that are not strictly required, so I made more changes to keep only the files we really need.
Basically, I only flush the buffer when I find a checkpoint. Some example:
- Expired: 0.json-5.json
- Non-expired: 6.json-10.json + 10.checkpoint
- Result: Don't delete anything
- Expired: 0.json-15.json + 10.checkpoint
- Non-expired: 16.json-20.json + 20.checkpoint
- Result: Deletes 0.json to 9.json
- Expired: 0.json-19.json + 10.checkpoint
- Non-expired: 20.json + 20.checkpoint
- Result: Deletes 0.json to 19.json + 10.checkpoint. (in my initial implementation I was keeping versions starting from 10, the latest expired checkpoint)
@prakharjain09 could you also please have a look?
@prakharjain09 this is relatively small change, would be great to have this merged before next cut as it affects ability of users time travel
+ @allisonport-db , could we merge it before 4.0 release?
@dhruvarya-db do you have some cycles to review this PR please?
Hi @dhruvarya-db, let me know if you have any questions about this change. Thanks
Thanks @dhruvarya-db. I'm not sure what are the next steps to get it merged to master and if I should open a PR to 4.0 branch?
Looks like 4.0.0 has already been released. @allisonport-db does it make sense to backport this to the 4.0 branch? It might be useful in case there is another 4.0 maintenance release?
Hi @allisonport-db, could you help with the merge, and should I create a PR to cherry pick it to 4.0?
Thanks
Gentle ping @allisonport-db
@scottsand-db could you help with merge and help decide if it make sense to backport this to the 4.0 branch?
@dhruvarya-db and @felipepessoto -- is this considered a bug or correctness fix? Thats' the only reason we would want to backport it to the 4.0 branch
Hard to judge since protocol is not strict about it, it only provides recommendation. But the consequences can be severe, as data loss since user can't reconstruct old version.
@vkorukanti, @scottsand-db tests are all green after the rebase
Thanks @vkorukanti
@scottsand-db, since it is not following protocol recommendation and can cause data loss, I believe it fits on bug fix category. I created a PR for 4.0 branch: https://github.com/delta-io/delta/pull/4784