delta
delta copied to clipboard
Support Auto Compaction
Description
Support Auto Compaction described in: https://docs.databricks.com/delta/optimizations/auto-optimize.html#how-auto-compaction-works
We can support Auto compaction via a new post commit hook and OptimizeCommand with less size threshold.
spark.databricks.delta.autoCompact.enabled
(default: false)
spark.databricks.delta.autoCompact.maxFileSize
(default: 128MB)
spark.databricks.delta.autoCompact.minNumFiles
(default: 50)
The configs above are same as Databricks Auto compaction.
New config1 - autoCompact.maxCompactBytes
As it will be triggered after every table update, I introduced another config to control the total amount of data to be optimized for an auto compaction operation:
spark.databricks.delta.autoCompact.maxCompactBytes
(default: 50G)
In Databricks, it's adjusted based on available cluster resources. The config is a quick and easy workaround for it.
New config2 - autoCompact.target
The PR adds another new config - autoCompact.target
to change target files for auto compaction.
spark.databricks.delta.autoCompact.target
(default: "table")
-
table
: target all files in the table -
commit
: target only added/updated files of the commit which is triggering auto compaction. -
partition
: target only the partitions containing any of added/updated files of the commit which is triggering auto compaction.
Users are usually writing/updating data only for few partitions, and don't expect changes in other partitions.
In case the table is not optimized, the default behavior table
might cause some conflicts between other partitions unexpectedly and added/updated files in the triggering commit might not be optimized if there are many small files in other partitions.
Fixes #815
How was this patch tested?
Unit tests
Does this PR introduce any user-facing changes?
Support Auto compaction feature
I didn't write a design doc & issue since it's straightforward. Please let me know if we need a design documentation.
Hi @sezruby - thanks for this PR! It will take some time for us to review and verify it. We will get back to you.
Hi @sezruby - just updating you with the status on our end. We are very busy with planned features for the next release of Delta Lake, as well with preparation for the upcoming Data and AI summit in June.
So, it will take us some time to get back to you on this.
@vkorukanti Could you review the PR when you have the time? TIA!
@vkorukanti Could you review the PR when you have the time? TIA!
@vkorukanti @scottsand-db A gentle reminder. This one is simpler than Optimize Write so I would like to merge this PR first.
Can you please fix the conflicts?
@scottsand-db @zsxwing Could you review the PR?
We are also having this issue, we can't define disjoint conditions from both merge and optimize if they are done concurrently.
We are also having this issue, we can't define disjoint conditions from both merge and optimize if they are done concurrently.
@pedrosalgadowork which issue do you mean by? is it related to auto compaction?
@scottsand-db @zsxwing @tdas Could you review the PR?
@scottsand-db @zsxwing @tdas - can you help review this PR? Its been open for several months now with no updates/comments recently.
Would be great to have this on Delta 2.3. Is it the plan to merge it soon?
Looks like there's some conflicts with the new DV stuff, had to update some things rebasing things on the 2.3 release in my fork.
Would be great to get some more looks at this and get this merged in, this is a highly valuable and missing feature.
@dennyglee @scottsand-db @zsxwing @tdas Could you review the PR?
@dennyglee @scottsand-db @zsxwing @tdas Could you review the PR? I'll resolve the conflict once you started actively reviewing.
@dennyglee @scottsand-db @zsxwing @tdas @allisonport-db Could you review the PR?
Is there any obstacle to the review of this PR?
@sezruby In Class spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala method "groupFilesIntoBins"
val filteredByBinSize = bins.filter { bin => // bin size is equal to or greater than autoCompactMinNumFiles files bin.size >= autoCompactMinNumFiles || // or bin size + number of deletion vectors >= autoCompactMinNumFiles files bin.count(_.deletionVector != null) + bin.size >= autoCompactMinNumFiles }.map(b => (partition, b))
why are we using individual bin.size
while comparing to autoCompactMinNumFiles
?
If total files size are greater than autoCompact.maxFileSize and total number of files are > MinNumFiles, but after segregating it in bins by size the individual bins will always have lesser files than MinNumFiles and hence it will not auto-compact the files.
Any particular reason for doing that ? i understand it might cause compaction of some small file but isn't it better than no compaction ?