qbeast-spark
qbeast-spark copied to clipboard
Progressive append
Fixes #132.
Compress small trees when appending to an existing revision
Appending a small volume of data to an existing qbeast table is likely to create many small files, as shown in #132. To tackle the issue we can recursively group and send sibling payloads to their parent cubes as long as the resulting size is under a certain threshold, resulting in fewer and larger blocks to write.
The process is called COMPRESSION and is done after the cubeColumnName is created and before repartitioning data by cubes.
The process is only triggered when appending data to an existing revision, for it is of our interest for the initial tree to have a natural depth. On top of that, the appended data has to be small (<= MAX_SIZE_FOR_APPEND_COMPRESSION) otherwise blocks are already large. Compressing a large tree is expensive.
MAX_SIZE_FOR_APPEND_COMPRESSION is configurable through spark.qbeast.index.maxAppendCompressionSize, default value is set to 0.
Influence on cube maxWeights
Cube maxWeights from the parallel implementation increase monotonically from top to bottom on any branch. We need to make sure the proposed compression won't alter this behavior - when moving cube elements to their ancestors, both parent and child maxWeights are left intact, in fact, the append won't even affect the maxWeight of the child cube since no elements are written, and the ancestor cube will have maxWeight reduced by the update.
Influence on online sampling and querying
The compression doesn't modify the maxWeights assigned during estimation. Branch maxWeights are left monotonically increasing so it won't affect the correctness of our sampling. The efficiency, however, may drop for blocks now have more elements than that reflected by their maxWeight; the excess elements are later filtered in memory.
- [x] Add/update documentation
- [x] New feature / bug fix has been committed following the Contribution guide.
- [x] Add comments to the code (make it easier for the community!).
- [x] Add tests.
- [x] Your branch is updated to the main branch (dependent changes have been merged).
Tests:
- Make sure the compressed tree has fewer cubes than the original
- Make sure adding a compressed tree doesn't change the fact that maxWeights increase monotonically in any branch
- Make sure that inline sampling is not corrupted by adding a compressed tree
Test Configuration:
- Spark Version:
3.2.2 - Hadoop Version:
3.3.1 - Cluster or local?
Local
TODO:
- [x] Compute cube sizes from delta and existing cube weights in order to avoid collect
- [x] Perform compression through roll-up and use the resulting map during indexing
- [x] Consider cube state during compression
- [ ] Update tests
Codecov Report
Merging #133 (84b235f) into main (880203a) will increase coverage by
0.19%. The diff coverage is100.00%.
:exclamation: Current head 84b235f differs from pull request most recent head 0d3c8ef. Consider uploading reports for the commit 0d3c8ef to get more accurate results
@@ Coverage Diff @@
## main #133 +/- ##
==========================================
+ Coverage 91.96% 92.15% +0.19%
==========================================
Files 62 62
Lines 1456 1505 +49
Branches 109 108 -1
==========================================
+ Hits 1339 1387 +48
- Misses 117 118 +1
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...n/scala/io/qbeast/core/model/RevisionClasses.scala | 73.80% <ø> (ø) |
|
| ...io/qbeast/core/model/BroadcastedTableChanges.scala | 100.00% <ø> (ø) |
|
| ...cala/io/qbeast/spark/index/OTreeDataAnalyzer.scala | 98.38% <ø> (ø) |
|
| ...east/spark/delta/writer/SparkDeltaDataWriter.scala | 100.00% <100.00%> (ø) |
|
| ...rc/main/scala/org/apache/spark/qbeast/config.scala | 100.00% <100.00%> (ø) |
|
| ...ala/io/qbeast/spark/delta/writer/BlockWriter.scala | 95.65% <0.00%> (-2.18%) |
:arrow_down: |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
Feedback from @cugni:
- The current implementation works based on the cube sizes; we perform roll-up to compress the tree height until cube sizes increase to a threshold. The
collectoperation used to obtain the cube sizes should be replaced by an estimation deduced from the existingCubeStatusesand estimatedCubeWeights. - Tree compression should be done before indexing, so later, the
PointWeightIndexercan use the compressed index directly - The OTree compression should ALWAYS be performed(?), as long as the cube size is within a predetermined range; operation not reserved for small appends, but for all writes!
- Tree compression should consider cube STATE
Closing this PR for now. We can still use Compaction to solve the problem of Small Files or add a Roll-up command as stated in #147 . 😃