reactor-core
reactor-core copied to clipboard
Implementation of FluxLimitRate
This PR provides an implementation of Queue-free Rate Limiter (see #1841)
Codecov Report
Merging #1879 into master will decrease coverage by
0.13%. The diff coverage is73.48%.
@@ Coverage Diff @@
## master #1879 +/- ##
============================================
- Coverage 81.59% 81.45% -0.14%
- Complexity 3896 3902 +6
============================================
Files 372 373 +1
Lines 30253 30543 +290
Branches 5660 5716 +56
============================================
+ Hits 24685 24880 +195
- Misses 4019 4086 +67
- Partials 1549 1577 +28
| Impacted Files | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| ...ore/src/main/java/reactor/core/publisher/Flux.java | 97.34% <100%> (-0.59%) |
523 <1> (+2) |
|
| ...ain/java/reactor/core/publisher/FluxLimitRate.java | 73.33% <73.33%> (ø) |
3 <3> (?) |
|
| ...rc/main/java/reactor/core/publisher/MonoUsing.java | 67.15% <0%> (-22.14%) |
3% <0%> (ø) |
|
| ...or-core/src/main/java/reactor/core/Exceptions.java | 85.27% <0%> (-5.25%) |
49% <0%> (ø) |
|
| ...ore/src/main/java/reactor/core/publisher/Mono.java | 90.88% <0%> (-2.4%) |
270% <0%> (ø) |
|
| ...n/java/reactor/core/publisher/FluxMaterialize.java | 95.45% <0%> (-1.47%) |
2% <0%> (ø) |
|
| ...c/main/java/reactor/core/publisher/FluxReplay.java | 84.25% <0%> (-0.16%) |
27% <0%> (ø) |
|
| ...src/main/java/reactor/core/publisher/MonoPeek.java | 100% <0%> (ø) |
10% <0%> (ø) |
:arrow_down: |
| ...ain/java/reactor/core/publisher/MonoUsingWhen.java | 93.1% <0%> (ø) |
7% <0%> (ø) |
:arrow_down: |
| ... and 11 more |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact),ø = not affected,? = missing dataPowered by Codecov. Last update 2e7bac8...49d71cd. Read the comment docs.
Any planned progress on this one @OlegDokuka ?
@simonbasle I need to write up the docs. I have been waiting for your initial prereview to finalize it. If it looks good, going to push immediately the rest of missing docs! 🤓👨🔧
Alright, I should get back to this PR shortly
@OlegDokuka @simonbasle Proposing to implement additional variation of limitRate based on Oleg's enhancement. Ability to pass BiFunction that returns Publisher and exposes 2 variables that can be passed to this publisher originator: long offset and long limit. Limit Rate will be passing data from Publisher in batches.
This enhancement will be beneficial, as allows to control how many items were produced, and how many items can be produced more by passed stream reference that BiFunction returns.
This will allow to have different use cases, but most commonly used will be streams that deals with databases, as having limit and offset values calculations that come from Flux signals, will allow to use them inside database queries and split process of data retrieval to rate limited batches.
Also that can be the first step for Spring Data in implementation of Pageable, that listens to native Flux signals. In case of WebFlux, that mostly uses Pageable from PageRequest, it may allow to change logic to native Flux requests qty. But in case of RSocket, with persistent streams, this new Pageable that right now can be generated from limitRate, will allow be utilized right before streams that deals with databases. That will secure server from breaking because of non-controllable high amount originated request numbers that may overload database usage.
As I mentioned in other issues, I already have such my custom operator built based on limitRate, and it completely simplifies injection of Pageable generation for databases access. And it will be nice to have it natively inside Reactor.
Thank you.
@maxim-bandurko-lsvt can you open an issue (enhancement type) to discuss the design and implementation of such an operator / variant of limitRate ? pursuing a generic solution the pagination problem sounds interesting. please also ping @mp911de as I'm sure he'll have some insight
@simonbasle Sure. Will open it. Thanks!
The last interaction here was almost 4 years ago. Closing due to inactivity. Please create a new PR targeting the latest codebase in case you feel this is still applicable.