reactor-core
reactor-core copied to clipboard
ConditionalSubscriber support where it is missed
This PR provides ConditionalSubscriber fusion support in common operator:
- [x] FluxSubscribeOn
- [ ] FluxSkip
- [ ] FluxRepeat
- [ ] FluxRepeatPredicate
- [ ] FluxRetry
- [ ] FluxRetryPredicate
- [ ] FluxMetrics
Signed-off-by: Oleh Dokuka [email protected]
Would love to hear any suggestions/objections/comments before I push the rest of the operators (FYI, they are prepared without tests, found those missing gaps during verification of FluxLimitRate operators, so decided to implement them)
cc/ @simonbasle @bsideup
Codecov Report
Merging #1880 into master will decrease coverage by
0.03%. The diff coverage is66.23%.
@@ Coverage Diff @@
## master #1880 +/- ##
============================================
- Coverage 81.47% 81.43% -0.04%
- Complexity 3897 3899 +2
============================================
Files 372 372
Lines 30359 30430 +71
Branches 5684 5699 +15
============================================
+ Hits 24734 24780 +46
- Misses 4056 4075 +19
- Partials 1569 1575 +6
| Impacted Files | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| ...n/java/reactor/core/publisher/FluxSubscribeOn.java | 75.83% <66.23%> (-6.22%) |
3 <0> (+1) |
|
| .../java/reactor/core/publisher/BlockingIterable.java | 71.87% <0%> (-6.25%) |
7% <0%> (ø) |
|
| ...ain/java/reactor/core/publisher/FluxConcatMap.java | 89.72% <0%> (-0.28%) |
7% <0%> (ø) |
|
| ...c/main/java/reactor/core/publisher/FluxReplay.java | 84.25% <0%> (-0.16%) |
27% <0%> (ø) |
|
| ...in/java/reactor/core/publisher/FluxWindowWhen.java | 81.73% <0%> (+0.48%) |
2% <0%> (ø) |
:arrow_down: |
| ...eactor/core/publisher/ParallelMergeSequential.java | 80.82% <0%> (+0.51%) |
7% <0%> (ø) |
:arrow_down: |
| .../java/reactor/core/scheduler/ElasticScheduler.java | 84.95% <0%> (+0.88%) |
26% <0%> (ø) |
:arrow_down: |
| ...ain/java/reactor/core/scheduler/SchedulerTask.java | 76.78% <0%> (+3.57%) |
17% <0%> (+1%) |
:arrow_up: |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact),ø = not affected,? = missing dataPowered by Codecov. Last update e21f75f...6fc529b. Read the comment docs.
I was wondering about how we duplicate code a lot between conditional variants and normal variants. The caveat of making the conditional one extend the normal one, on the other hand, is that it can deepen the stacktraces. A good middle ground would be to split the subscribers into an abstract base and 2 concrete final implementations, where each method that needs to be implemented in a particular way in either class would be made abstract (or omitted) in the base class. Does that sound about right @smaldini ?
Since it is needed to wrap arbitrary org.reactivestreams.Subscriber anyway, maybe the conditional could be the default internal API, so there is no need to typecheck every consumer.
I'm with you @simonbasle @akarnokd it is just my pain in finding such things during testing operators, so I'm more than happy to fix that, whilst I have some spare time to do that.
@akarnokd A drawback of using the conditional as a default co-implementation (I hope I understood you correctly) is a couple of if instructions in runtime (probably C2 could eliminate them), but I'm not sure how painful it is compared to type check during the assembly time.
I was thinking about something like a generic base class, e.g. -> https://gist.github.com/OlegDokuka/9516f3e13dd8ea13bbf8afee85afd6fd
static abstract class MapBaseSubscriber<T, R, S extends CoreSubscriber<? super R>>
implements InnerOperator<T, R> {
final S actual;
...
}
do you still feel the need to make progress on this one @OlegDokuka ?
@simonbasle, I'm open to finish that if we all think that is a needed improvement! As usual, you may count on my contribution 😁
The last interaction here was over 4 years ago. Closing due to inactivity. Please create a new PR targeting the latest codebase in case you feel this is still applicable.