iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Spark: Improve performance of expire snapshot by not double-scanning retained Snapshots

Open szehon-ho opened this issue 2 years ago • 12 comments

Expire snapshots can take a long time for large tables with millions of files and thousands of snapshots/manifests.

One cause is the calculation of files to be deleted. The current algorithm is:

  • Find the reachability graph of all snapshots before expiration
  • Find reachability graph of all snapshots after expiration
  • Subtract the second from the first, these are files to delete

But this explores every retained snapshot twice. Example: any periodic expire-snapshot job that expires 1 snapshot needs to explore all n-1 snapshots twice.

Proposal:

  • Find reachability graph of all snapshots after expiration
  • Find reachability graph of expired snapshots (if only a few expired, should be much smaller set)
  • Subtract the first from the second, these are files to delete

Implementation: For expired-snapshot scan, change the original spark query of metadata tables to custom spark jobs that only explore from expired snapshot(s).

Note: The new expired-snapshot scan duplicates manifestList scan logic to handle "write.manifest-lists.enabled"="false" flag, but unfortunately the functionality seems broken without this change and so not possible to test currently. Added a test for demonstration purpose.

szehon-ho avatar Nov 03 '21 01:11 szehon-ho

I'm going to try to do some simple benchmarks to validate it improves the perf, but putting the idea out here for any early feedback

szehon-ho avatar Nov 03 '21 02:11 szehon-ho

I think this is actually promising, the performance gain is in line with expectation.

Test: table with 1000 small snapshots, expire 1 at a time. The time and resources spent in expire-snapshot and input/shuffle sizes falls by little less than half, as most snapshots are not double-scanned.

Current: Screenshot 2021-11-09 at 20 38 28

Optimized: Screenshot 2021-11-09 at 20 38 22

So I think it's worth to continue. @rdblue thanks for the early review, I will look at the comments and make the changes

szehon-ho avatar Nov 10 '21 04:11 szehon-ho

Patch should be ready for more review Fyi @aokolnychyi this is the snapshot-based metadata scan I mentioned, not sure if it will be useful elsewhere.

szehon-ho avatar Nov 12 '21 04:11 szehon-ho

@szehon-ho sorry for not getting back to this sooner, let's finish this up when you are back online

RussellSpitzer avatar Mar 30 '22 23:03 RussellSpitzer

I'm not sure if people think these changes are too hacky.

Another option I've thought, is to implement IncrementalScan (https://github.com/apache/iceberg/pull/4580) for All_files table (to be added in https://github.com/apache/iceberg/pull/4694), which will allow snapshot filtering. Then rewrite ExpireSnapshotAction to use that table, with different filters to avoid double-scanning all_files. It would be much cleaner that way, but a bigger refactor (and would make #4674 much harder if we go that route)

I wonder was there was some reason initially to use all_manifests table and flatmap the ReadManifest task, rather than relying on all_files table? @rdblue @aokolnychyi for any thoughts/information.

szehon-ho avatar May 05 '22 01:05 szehon-ho

Actually I think I get why, all_files table does not parallelize the planning (reading each snapshot in spark task), so maybe better to keep this way (all_manifests table and ReadManifest task). Will take a look to see if this can be cleaned up in another way.

szehon-ho avatar May 05 '22 01:05 szehon-ho

@szehon-ho, the reason why we use all_manifests is to avoid the expensive driver-side manifest list file reads. all_files has to read through every manifest list in the table from the driver to find all the manifests, dedup, and then make tasks. The all_manifests table makes a task out of each manifest list to produce a collection of manifests from just the list of snapshots/manifest lists.

rdblue avatar May 10 '22 01:05 rdblue

@RussellSpitzer @aokolnychyi Rebased the pr , it's still using the manual way to filter out snapshots from all_manifest table, if you guys have time to take a look.

The idea I was mentioning in above few comments to use manifest table with snapshot filtering via time-travel (to make it cleaner), I tried to implement in : https://github.com/apache/iceberg/pull/4736 , I'd like to see your thoughts if that is a better approach. The problem there is I stopped when I realized manifest table do not support time-travel, and it didn't look trivial to implement.

Also FYI @ajantha-bhat as we were talking about this on https://github.com/apache/iceberg/pull/4674

szehon-ho avatar May 10 '22 02:05 szehon-ho

@rdblue yea thanks, I realized it after asking.

szehon-ho avatar May 10 '22 02:05 szehon-ho

Ref : discussion on https://github.com/apache/iceberg/pull/4736. This implements the original idea to reduce the 'deleteCandidate' scan to just the ones from deleted snapshots.

There is another idea to remove current manifests during the delete candidate scan as well, not done yet.

szehon-ho avatar Jul 01 '22 18:07 szehon-ho

Update: I implemented a draft for removing current manifests from delete candidate calcuation, but found it doesn't help much and actually makes the performance worse: see https://github.com/apache/iceberg/pull/4736#issuecomment-1176854697. , so I'll put a hold on that. That being said, it could be scenario dependent and I'm not seeing the scenario it helps with.

szehon-ho avatar Jul 06 '22 23:07 szehon-ho

Test got cancelled somehow, retriggering

szehon-ho avatar Jul 08 '22 23:07 szehon-ho

I'd love to take a quick look today too!

aokolnychyi avatar Oct 11 '22 22:10 aokolnychyi