iceberg
iceberg copied to clipboard
Core, Spark: Remove dangling deletes as part of RewriteDataFilesAction
Goal: Attempt to clean up the dangling deletes as part of Spark RewriteDataFilesAction, it can be controlled by feature flag
remove-dangling-deletes
and it's by default turned on. Most of the code come from #6581 and reason on why we need it: The problem and design doc is here: https://docs.google.com/document/d/11d-cIUR_89kRsMmWnEoxXGZCvp7L4TUmPJqUC60zB5M/edit#
Changes
- DeleteFiles to remove a given deleteFile
- RewriteDataFilesResult now provide count on number of dangling files removed
- withReusableDS() function moved from RewriteManifestsSparkAction to base so it can be reused in RewriteDataFilesAction.
TODO, figure out predicate push down for entries metadata table.
- rewriteDataFiles today support a custom expression filter which are targeted against base table schema
- entries metadata table used for identifying the dangling deletes will have a different schema than base table, the partition filter can be translated but no sure about filter applied on other non-partitioned columns.
@szehon-ho can I ask for your eyes first?
I should have time to take a look this week.
Okay, I did one pass and here are my high-level notes:
We should use
RewriteFiles
instead ofDeleteFiles
, changes inDeleteFiles
should be reverted.I don't see a need for the enum to control the cleanup mode.
I'd consider having a separate action but I can be convinced otherwise. Especially, given that we may account for partition stats in the future.
I'd consider the following algorithm:
- Extend
data_files
anddelete_files
metadata tables to include data sequence numbers, if needed. I don't remember if we already populate them. This should be trivial as eachDeleteFile
object already has this info.- Query
data_files
, aggregate, compute min data sequence number per partition. Don't cache the computed result, just keep a reference to it.- Query
delete_files
, potentially projecting only strictly required columns.- Join the summary with
delete_files
on the spec ID and partition. Find delete files that can be discarded in one go by having a predicate that accounts for the delete type (position vs equality).- Collect the result to the driver and use
SparkDeleteFile
to wrap Spark rows as valid delete files. See the action for rewriting manifests for an example.
Based on Anton's feedback, I will try divide the changes into 2 PRs where first PR (#9813) to support data sequence number in data and delete files table. Once merged, I will update to scan data_files first to aggregate per spec/partition min data sequence number, then compare against the delete_files. With left join, we can identify dangling deletes and remove them in one pass. SparkDeleteFile will be used to convert from spark row to POJO to be used for pruning, in consideration of partition evolution. Finally, dangling delete will be removed by reconstruction instead of by file path, to benefit manifest pruning when iceberg table was scanned.
"Finally, dangling delete will be removed by reconstruction instead of by file path, to benefit manifest pruning when iceberg table was scanned."
I guess only partitionData and path is needed, others all not used.
With the merge of #10203 , I refactored the algorithm a bit to scan entries table for getting minSequenceNumberPerPartitionAndSpec and for getting delete files table for data sequence number instead of rely on data sequence number as virtual columns. I also identified and fixed the problem in partition evolution tests so that now it's all handled correctly. Would you like to take another look? @szehon-ho @aokolnychyi