risingwave
risingwave copied to clipboard
feat(executor): concurrent fetch of `JoinStateEntry`s in streaming hash join
What's changed and what's your intention?
Fetch all JoinStateEntrys async concurrently, grouping updates in stream chunk by join key.
TODO:
- [ ] Add more comments
- [ ] Add benches to show that this is in fact better performing
Closes: https://github.com/singularity-data/risingwave/issues/2428
Codecov Report
Merging #2772 (a7d556f) into main (a4d85ed) will increase coverage by
0.03%. The diff coverage is96.64%.
@@ Coverage Diff @@
## main #2772 +/- ##
==========================================
+ Coverage 72.67% 72.71% +0.03%
==========================================
Files 702 702
Lines 92150 92334 +184
==========================================
+ Hits 66969 67137 +168
- Misses 25181 25197 +16
| Flag | Coverage Δ | |
|---|---|---|
| rust | 72.71% <96.64%> (+0.03%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Δ | |
|---|---|---|
| src/stream/src/executor/managed_state/join/mod.rs | 80.70% <84.55%> (+7.13%) |
:arrow_up: |
| src/common/src/array/stream_chunk.rs | 76.40% <100.00%> (+1.50%) |
:arrow_up: |
| src/common/src/collection/evictable.rs | 76.27% <100.00%> (+2.68%) |
:arrow_up: |
| src/stream/src/executor/hash_join.rs | 97.26% <100.00%> (+0.19%) |
:arrow_up: |
| ...rc/executor/managed_state/join/join_entry_state.rs | 80.45% <100.00%> (-1.82%) |
:arrow_down: |
| src/common/src/util/value_encoding/mod.rs | 33.09% <0.00%> (-4.32%) |
:arrow_down: |
| src/meta/src/model/barrier.rs | 78.57% <0.00%> (-3.58%) |
:arrow_down: |
| src/common/src/array/data_chunk_iter.rs | 82.75% <0.00%> (-2.07%) |
:arrow_down: |
| ... and 4 more |
:mega: Codecov can now indicate which changes are the most critical in Pull Requests. Learn more
I am queuing up several changes for a next PR:
- Accumulate small chunks into big chunks before join executor
- Fix no update join row count bug (and flush as deleteupdate). CC: @yuhao-su I need this to get the next PR to work with a refactor. I will just flush the entire matched rows cache as long as one of them have their row count changed.
- assert_equivalent_to to see the left and right chunks in test failures, rather than use assert!
For now, I will fix some outstanding issues. I will show that next PR is competitive with master.
Is it possible to get this PR merged first, so that we can see whether a simplest concurrent fetch could solve the low throughput issue?
Shall we close this?
How about re-implementing this after memory management is introduced? The idea still works, though. @jon-chuang may have an offline discussion first
I have an idea for better approach. We can test it and test alongside storage layer to see if there is any regression.