delta icon indicating copy to clipboard operation
delta copied to clipboard

[Spark] Fix metadata cleanup by retaining files required for log reconstruction

Open felipepessoto opened this issue 9 months ago • 8 comments

Which Delta project/connector is this regarding?

  • [X] Spark
  • [ ] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Description

Delete eligible delta log files only if there's a checkpoint newer than them before the cutoff window.

Resolves #606

Unit tests based on https://github.com/delta-io/delta/pull/2673

How was this patch tested?

Unit Tests

Does this PR introduce any user-facing changes?

Yes, tables with low rate of commit/checkpoints will have increased log retention beyond the cutoff window

felipepessoto avatar Feb 11 '25 20:02 felipepessoto

@andreaschat-db, could you help with the Metadata cleanup respects requireCheckpointProtectionBeforeVersion test failure, please?

I couldn't find any info in Delta protocol about checkpointProtection table feature

[info] - Metadata cleanup respects requireCheckpointProtectionBeforeVersion *** FAILED ***
[info]   HashSet(10, 14, 6, 9, 13, 12, 7, 11, 8, 15) did not equal HashSet(10, 14, 9, 13, 12, 11, 8, 15) (CheckpointProtectionTestUtilsMixin.scala:91)
[info]   Analysis:
[info]   HashSet(missingInRight: [6, 7])
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.$anonfun$testRequireCheckpointProtectionBeforeVersion$1(CheckpointProtectionTestUtilsMixin.scala:91)
[info]   at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.$anonfun$testRequireCheckpointProtectionBeforeVersion$1$adapted(CheckpointProtectionTestUtilsMixin.scala:38)
[info]   at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir(DeltaSQLTestUtils.scala:52)
[info]   at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir$(DeltaSQLTestUtils.scala:50)
[info]   at org.apache.spark.sql.delta.DeltaRetentionSuite.withTempDir(DeltaRetentionSuite.scala:38)
[info]   at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir(DeltaSQLTestUtils.scala:35)
[info]   at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir$(DeltaSQLTestUtils.scala:34)
[info]   at org.apache.spark.sql.delta.DeltaRetentionSuite.withTempDir(DeltaRetentionSuite.scala:38)
[info]   at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.testRequireCheckpointProtectionBeforeVersion(CheckpointProtectionTestUtilsMixin.scala:38)
[info]   at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.testRequireCheckpointProtectionBeforeVersion$(CheckpointProtectionTestUtilsMixin.scala:30)
[info]   at org.apache.spark.sql.delta.DeltaRetentionSuite.testRequireCheckpointProtectionBeforeVersion(DeltaRetentionSuite.scala:38)
[info]   at org.apache.spark.sql.delta.DeltaRetentionSuite.$anonfun$new$94(DeltaRetentionSuite.scala:819)

felipepessoto avatar Feb 21 '25 02:02 felipepessoto

@andreaschat-db, could you help with the Metadata cleanup respects requireCheckpointProtectionBeforeVersion test failure, please?

I couldn't find any info in Delta protocol about checkpointProtection table feature

[info] - Metadata cleanup respects requireCheckpointProtectionBeforeVersion *** FAILED ***
[info]   HashSet(10, 14, 6, 9, 13, 12, 7, 11, 8, 15) did not equal HashSet(10, 14, 9, 13, 12, 11, 8, 15) (CheckpointProtectionTestUtilsMixin.scala:91)
[info]   Analysis:
[info]   HashSet(missingInRight: [6, 7])
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.$anonfun$testRequireCheckpointProtectionBeforeVersion$1(CheckpointProtectionTestUtilsMixin.scala:91)
[info]   at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.$anonfun$testRequireCheckpointProtectionBeforeVersion$1$adapted(CheckpointProtectionTestUtilsMixin.scala:38)
[info]   at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir(DeltaSQLTestUtils.scala:52)
[info]   at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir$(DeltaSQLTestUtils.scala:50)
[info]   at org.apache.spark.sql.delta.DeltaRetentionSuite.withTempDir(DeltaRetentionSuite.scala:38)
[info]   at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir(DeltaSQLTestUtils.scala:35)
[info]   at org.apache.spark.sql.delta.test.DeltaSQLTestUtils.withTempDir$(DeltaSQLTestUtils.scala:34)
[info]   at org.apache.spark.sql.delta.DeltaRetentionSuite.withTempDir(DeltaRetentionSuite.scala:38)
[info]   at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.testRequireCheckpointProtectionBeforeVersion(CheckpointProtectionTestUtilsMixin.scala:38)
[info]   at org.apache.spark.sql.delta.CheckpointProtectionTestUtilsMixin.testRequireCheckpointProtectionBeforeVersion$(CheckpointProtectionTestUtilsMixin.scala:30)
[info]   at org.apache.spark.sql.delta.DeltaRetentionSuite.testRequireCheckpointProtectionBeforeVersion(DeltaRetentionSuite.scala:38)
[info]   at org.apache.spark.sql.delta.DeltaRetentionSuite.$anonfun$new$94(DeltaRetentionSuite.scala:819)

Hi @felipepessoto, please check the doc in TableFeature. AFAIU, your PR is changing which checkpoints are removed and this changes the expectations of the test.

andreaschat-db avatar Feb 21 '25 08:02 andreaschat-db

@andreaschat-db, I fixed it, could you please review it?

felipepessoto avatar May 20 '25 08:05 felipepessoto

Hi @felipepessoto, thank you for looking at this. I left a couple of comments. However, I am a bit confused about what is exactly the issue we are trying to fix. AFAIU, the BufferingLogDeletionIterator is called with a max version. Isn't it guaranteed that at maxVersion + 1 there is a checkpoint? If so, what is the scenario that this can go wrong? Is there a case where we may stop earlier than maxVersion?

@andreaschat-db currently we remove the expired files as long they meet the maxVersion and maxTimestamp requirements. The problem is if we have the following:

  • Expired: 0.json-5.json
  • Non-expired: 6.json-10.json + 10.checkpoint

The cleanup deletes 0-5, and the versions 6-9 are not expired, but can't be queried because they depend on 0-5 json files to build the Snapshot. This is a known issue, the protocol has a suggestion to always keep the last expired checkpoint: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#metadata-cleanup. It was my initial implementation, but then I found it causes issues with the checkpointProtection, because it keeps logs files that are not strictly required, so I made more changes to keep only the files we really need.

Basically, I only flush the buffer when I find a checkpoint. Some example:

  • Expired: 0.json-5.json
  • Non-expired: 6.json-10.json + 10.checkpoint
  • Result: Don't delete anything

  • Expired: 0.json-15.json + 10.checkpoint
  • Non-expired: 16.json-20.json + 20.checkpoint
  • Result: Deletes 0.json to 9.json

  • Expired: 0.json-19.json + 10.checkpoint
  • Non-expired: 20.json + 20.checkpoint
  • Result: Deletes 0.json to 19.json + 10.checkpoint. (in my initial implementation I was keeping versions starting from 10, the latest expired checkpoint)

felipepessoto avatar May 20 '25 17:05 felipepessoto

@prakharjain09 could you also please have a look?

andreaschat-db avatar May 21 '25 07:05 andreaschat-db

@prakharjain09 this is relatively small change, would be great to have this merged before next cut as it affects ability of users time travel

felipepessoto avatar May 24 '25 23:05 felipepessoto

+ @allisonport-db , could we merge it before 4.0 release?

felipepessoto avatar May 28 '25 18:05 felipepessoto

@dhruvarya-db do you have some cycles to review this PR please?

raveeram-db avatar May 29 '25 05:05 raveeram-db

Hi @dhruvarya-db, let me know if you have any questions about this change. Thanks

felipepessoto avatar Jun 02 '25 19:06 felipepessoto

Thanks @dhruvarya-db. I'm not sure what are the next steps to get it merged to master and if I should open a PR to 4.0 branch?

felipepessoto avatar Jun 09 '25 18:06 felipepessoto

Looks like 4.0.0 has already been released. @allisonport-db does it make sense to backport this to the 4.0 branch? It might be useful in case there is another 4.0 maintenance release?

dhruvarya-db avatar Jun 09 '25 18:06 dhruvarya-db

Hi @allisonport-db, could you help with the merge, and should I create a PR to cherry pick it to 4.0?

Thanks

felipepessoto avatar Jun 11 '25 23:06 felipepessoto

Gentle ping @allisonport-db

felipepessoto avatar Jun 16 '25 07:06 felipepessoto

@scottsand-db could you help with merge and help decide if it make sense to backport this to the 4.0 branch?

felipepessoto avatar Jun 17 '25 22:06 felipepessoto

@dhruvarya-db and @felipepessoto -- is this considered a bug or correctness fix? Thats' the only reason we would want to backport it to the 4.0 branch

scottsand-db avatar Jun 17 '25 22:06 scottsand-db

Hard to judge since protocol is not strict about it, it only provides recommendation. But the consequences can be severe, as data loss since user can't reconstruct old version.

felipepessoto avatar Jun 17 '25 22:06 felipepessoto

@vkorukanti, @scottsand-db tests are all green after the rebase

felipepessoto avatar Jun 18 '25 01:06 felipepessoto

Thanks @vkorukanti

@scottsand-db, since it is not following protocol recommendation and can cause data loss, I believe it fits on bug fix category. I created a PR for 4.0 branch: https://github.com/delta-io/delta/pull/4784

felipepessoto avatar Jun 18 '25 05:06 felipepessoto