qbeast-spark icon indicating copy to clipboard operation
qbeast-spark copied to clipboard

Compaction

Open osopardo1 opened this issue 3 years ago • 4 comments

Description

Adds new feature #98 . :raised_hands: Compaction of small files is coming!

Type of change

In this PR, we present a new feature: compaction of small files.

These changes include a new command to produce the compaction:

val qbeastTable = QbeastTable.forPath(spark, "/tmp/qbeast_table")
qbeastTable.compact()

Under the hood, Qbeast will find the files larger than spark.qbeast.compact.minFileSize and small than spark.qbeast.compact.maxFileSize and compact them into single files.

You can change this configuration through the SparkConf:

--conf spark.qbeast.compact.minFileSize=1\
--conf spark.qbeast.compact.maxFileSize=10000

The default configuration is set to 1024 * 1024 * 1024 bytes as in Delta DELTA_OPTIMIZE_MIN_FILE_SIZE.

Checklist:

Here is the list of things you should do before submitting this pull request:

  • [x] WIP
  • [x] New feature / bug fix has been committed following the Contribution guide.
  • [x] Add comments to the code (make it easier for the community!).
  • [x] Change the documentation.
  • [x] Add tests.
  • [x] Your branch is updated to the main branch (dependent changes have been merged).

How Has This Been Tested? (Optional)

Still needs to be tested! I will keep updates on this Pull Request.

Test Configuration:

  • Spark Version:
  • Hadoop Version:
  • Cluster or local?

osopardo1 avatar Jun 20 '22 12:06 osopardo1

Codecov Report

Merging #110 (93af690) into main (8b96c68) will increase coverage by 0.99%. The diff coverage is 98.34%.

@@            Coverage Diff             @@
##             main     #110      +/-   ##
==========================================
+ Coverage   90.69%   91.68%   +0.99%     
==========================================
  Files          59       62       +3     
  Lines        1344     1432      +88     
  Branches       99      105       +6     
==========================================
+ Hits         1219     1313      +94     
+ Misses        125      119       -6     
Impacted Files Coverage Δ
...ala/io/qbeast/spark/delta/IndexStatusBuilder.scala 100.00% <ø> (ø)
...cala/io/qbeast/spark/delta/writer/BlockStats.scala 75.00% <ø> (ø)
...ala/io/qbeast/spark/delta/writer/BlockWriter.scala 97.82% <ø> (ø)
src/main/scala/io/qbeast/spark/QbeastTable.scala 95.06% <84.61%> (+7.06%) :arrow_up:
...c/main/scala/io/qbeast/context/QbeastContext.scala 94.73% <100.00%> (ø)
...cala/io/qbeast/spark/delta/QbeastMetadataSQL.scala 100.00% <100.00%> (ø)
...scala/io/qbeast/spark/delta/writer/Compactor.scala 100.00% <100.00%> (ø)
...east/spark/delta/writer/SparkDeltaDataWriter.scala 100.00% <100.00%> (ø)
.../spark/internal/commands/CompactTableCommand.scala 100.00% <100.00%> (ø)
...ain/scala/io/qbeast/spark/table/IndexedTable.scala 91.35% <100.00%> (+0.69%) :arrow_up:
... and 3 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov[bot] avatar Jun 22 '22 08:06 codecov[bot]

I put you both @Jiaweihu08 and @polsm91 as reviewers to give me feedback if you can :) If you need an explanation on something, please let me know.

osopardo1 avatar Jul 22 '22 07:07 osopardo1

Changes explained:

  • IndexedTable would have a .compact() method that would follow the same pipeline as the write: Compact data (group info into "bins" for each cube), write data, and commit.
  • DataWriter would also have a .compact() method that would be in charge of writing the data into files and outputting the necessary file actions (Add the compacted files and remove the small ones)
  • A case class Compactor is created. Since the process is more parallelizable and simpler than the BlockWriter, I added a new process to be in charge of writing the parquet file and adding the proper tags.
  • MetadataManager would commit the information to the Log.

Regarding the structure of the code:

  • SparkDataWriter it's changed for SparkDeltaDataWriter since each Format has different ways of writing the metadata (Different FileAction)

The API for the user would be the same for optimize() and analyze():

val qbeastTable = QbeastTable.forPath(spark, "path")
qbeastTable.compact()

This action would compact the files in the latest revision within [MIN_SIZE_FILE_COMPACTION, MAX_SIZE_FILE_COMPACTION] bytes. If the user wants to change these values, can do so by changing the SparkConf:

--conf spark.qbeast.compact.minFileSize=1\
--conf spark.qbeast.compact.maxFileSize=10000\

The default configuration is set to 1024 * 1024 * 1024 as in Delta DELTA_OPTIMIZE_MIN_FILE_SIZE configuration.

osopardo1 avatar Jul 22 '22 07:07 osopardo1

@polsm91 asked me to review this PR, I will provide my remarks below.

alexeiakimov avatar Aug 05 '22 12:08 alexeiakimov