qbeast-spark
qbeast-spark copied to clipboard
Compaction
Description
Adds new feature #98 . :raised_hands: Compaction of small files is coming!
Type of change
In this PR, we present a new feature: compaction of small files.
These changes include a new command to produce the compaction:
val qbeastTable = QbeastTable.forPath(spark, "/tmp/qbeast_table")
qbeastTable.compact()
Under the hood, Qbeast will find the files larger than spark.qbeast.compact.minFileSize and small than spark.qbeast.compact.maxFileSize and compact them into single files.
You can change this configuration through the SparkConf:
--conf spark.qbeast.compact.minFileSize=1\
--conf spark.qbeast.compact.maxFileSize=10000
The default configuration is set to 1024 * 1024 * 1024 bytes as in Delta DELTA_OPTIMIZE_MIN_FILE_SIZE.
Checklist:
Here is the list of things you should do before submitting this pull request:
- [x] WIP
- [x] New feature / bug fix has been committed following the Contribution guide.
- [x] Add comments to the code (make it easier for the community!).
- [x] Change the documentation.
- [x] Add tests.
- [x] Your branch is updated to the main branch (dependent changes have been merged).
How Has This Been Tested? (Optional)
Still needs to be tested! I will keep updates on this Pull Request.
Test Configuration:
- Spark Version:
- Hadoop Version:
- Cluster or local?
Codecov Report
Merging #110 (93af690) into main (8b96c68) will increase coverage by
0.99%. The diff coverage is98.34%.
@@ Coverage Diff @@
## main #110 +/- ##
==========================================
+ Coverage 90.69% 91.68% +0.99%
==========================================
Files 59 62 +3
Lines 1344 1432 +88
Branches 99 105 +6
==========================================
+ Hits 1219 1313 +94
+ Misses 125 119 -6
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...ala/io/qbeast/spark/delta/IndexStatusBuilder.scala | 100.00% <ø> (ø) |
|
| ...cala/io/qbeast/spark/delta/writer/BlockStats.scala | 75.00% <ø> (ø) |
|
| ...ala/io/qbeast/spark/delta/writer/BlockWriter.scala | 97.82% <ø> (ø) |
|
| src/main/scala/io/qbeast/spark/QbeastTable.scala | 95.06% <84.61%> (+7.06%) |
:arrow_up: |
| ...c/main/scala/io/qbeast/context/QbeastContext.scala | 94.73% <100.00%> (ø) |
|
| ...cala/io/qbeast/spark/delta/QbeastMetadataSQL.scala | 100.00% <100.00%> (ø) |
|
| ...scala/io/qbeast/spark/delta/writer/Compactor.scala | 100.00% <100.00%> (ø) |
|
| ...east/spark/delta/writer/SparkDeltaDataWriter.scala | 100.00% <100.00%> (ø) |
|
| .../spark/internal/commands/CompactTableCommand.scala | 100.00% <100.00%> (ø) |
|
| ...ain/scala/io/qbeast/spark/table/IndexedTable.scala | 91.35% <100.00%> (+0.69%) |
:arrow_up: |
| ... and 3 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
I put you both @Jiaweihu08 and @polsm91 as reviewers to give me feedback if you can :) If you need an explanation on something, please let me know.
Changes explained:
IndexedTablewould have a.compact()method that would follow the same pipeline as the write: Compact data (group info into "bins" for each cube), write data, and commit.DataWriterwould also have a.compact()method that would be in charge of writing the data into files and outputting the necessary file actions (Add the compacted files and remove the small ones)- A case class
Compactoris created. Since the process is more parallelizable and simpler than theBlockWriter, I added a new process to be in charge of writing the parquet file and adding the proper tags. MetadataManagerwould commit the information to the Log.
Regarding the structure of the code:
SparkDataWriterit's changed forSparkDeltaDataWritersince each Format has different ways of writing the metadata (Different FileAction)
The API for the user would be the same for optimize() and analyze():
val qbeastTable = QbeastTable.forPath(spark, "path")
qbeastTable.compact()
This action would compact the files in the latest revision within [MIN_SIZE_FILE_COMPACTION, MAX_SIZE_FILE_COMPACTION] bytes. If the user wants to change these values, can do so by changing the SparkConf:
--conf spark.qbeast.compact.minFileSize=1\
--conf spark.qbeast.compact.maxFileSize=10000\
The default configuration is set to 1024 * 1024 * 1024 as in Delta DELTA_OPTIMIZE_MIN_FILE_SIZE configuration.
@polsm91 asked me to review this PR, I will provide my remarks below.