qbeast-spark icon indicating copy to clipboard operation
qbeast-spark copied to clipboard

Progressive append

Open Jiaweihu08 opened this issue 3 years ago • 2 comments

Fixes #132.

Compress small trees when appending to an existing revision

Appending a small volume of data to an existing qbeast table is likely to create many small files, as shown in #132. To tackle the issue we can recursively group and send sibling payloads to their parent cubes as long as the resulting size is under a certain threshold, resulting in fewer and larger blocks to write.

The process is called COMPRESSION and is done after the cubeColumnName is created and before repartitioning data by cubes.

The process is only triggered when appending data to an existing revision, for it is of our interest for the initial tree to have a natural depth. On top of that, the appended data has to be small (<= MAX_SIZE_FOR_APPEND_COMPRESSION) otherwise blocks are already large. Compressing a large tree is expensive.

MAX_SIZE_FOR_APPEND_COMPRESSION is configurable through spark.qbeast.index.maxAppendCompressionSize, default value is set to 0.

Influence on cube maxWeights

Cube maxWeights from the parallel implementation increase monotonically from top to bottom on any branch. We need to make sure the proposed compression won't alter this behavior - when moving cube elements to their ancestors, both parent and child maxWeights are left intact, in fact, the append won't even affect the maxWeight of the child cube since no elements are written, and the ancestor cube will have maxWeight reduced by the update.

Influence on online sampling and querying

The compression doesn't modify the maxWeights assigned during estimation. Branch maxWeights are left monotonically increasing so it won't affect the correctness of our sampling. The efficiency, however, may drop for blocks now have more elements than that reflected by their maxWeight; the excess elements are later filtered in memory.

  • [x] Add/update documentation
  • [x] New feature / bug fix has been committed following the Contribution guide.
  • [x] Add comments to the code (make it easier for the community!).
  • [x] Add tests.
  • [x] Your branch is updated to the main branch (dependent changes have been merged).

Tests:

  • Make sure the compressed tree has fewer cubes than the original
  • Make sure adding a compressed tree doesn't change the fact that maxWeights increase monotonically in any branch
  • Make sure that inline sampling is not corrupted by adding a compressed tree

Test Configuration:

  • Spark Version: 3.2.2
  • Hadoop Version: 3.3.1
  • Cluster or local? Local

TODO:

  • [x] Compute cube sizes from delta and existing cube weights in order to avoid collect
  • [x] Perform compression through roll-up and use the resulting map during indexing
  • [x] Consider cube state during compression
  • [ ] Update tests

Jiaweihu08 avatar Sep 15 '22 15:09 Jiaweihu08

Codecov Report

Merging #133 (84b235f) into main (880203a) will increase coverage by 0.19%. The diff coverage is 100.00%.

:exclamation: Current head 84b235f differs from pull request most recent head 0d3c8ef. Consider uploading reports for the commit 0d3c8ef to get more accurate results

@@            Coverage Diff             @@
##             main     #133      +/-   ##
==========================================
+ Coverage   91.96%   92.15%   +0.19%     
==========================================
  Files          62       62              
  Lines        1456     1505      +49     
  Branches      109      108       -1     
==========================================
+ Hits         1339     1387      +48     
- Misses        117      118       +1     
Impacted Files Coverage Δ
...n/scala/io/qbeast/core/model/RevisionClasses.scala 73.80% <ø> (ø)
...io/qbeast/core/model/BroadcastedTableChanges.scala 100.00% <ø> (ø)
...cala/io/qbeast/spark/index/OTreeDataAnalyzer.scala 98.38% <ø> (ø)
...east/spark/delta/writer/SparkDeltaDataWriter.scala 100.00% <100.00%> (ø)
...rc/main/scala/org/apache/spark/qbeast/config.scala 100.00% <100.00%> (ø)
...ala/io/qbeast/spark/delta/writer/BlockWriter.scala 95.65% <0.00%> (-2.18%) :arrow_down:

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov[bot] avatar Sep 16 '22 10:09 codecov[bot]

Feedback from @cugni:

  • The current implementation works based on the cube sizes; we perform roll-up to compress the tree height until cube sizes increase to a threshold. The collect operation used to obtain the cube sizes should be replaced by an estimation deduced from the existing CubeStatuses and estimated CubeWeights.
  • Tree compression should be done before indexing, so later, the PointWeightIndexer can use the compressed index directly
  • The OTree compression should ALWAYS be performed(?), as long as the cube size is within a predetermined range; operation not reserved for small appends, but for all writes!
  • Tree compression should consider cube STATE

Jiaweihu08 avatar Sep 27 '22 13:09 Jiaweihu08

Closing this PR for now. We can still use Compaction to solve the problem of Small Files or add a Roll-up command as stated in #147 . 😃

osopardo1 avatar Jan 23 '23 11:01 osopardo1