fix #2839 Buffer when downstream not ready
When downstream requests have been exhausted but items have been buffered the current behaviour is to raise an error, this change instead will leave the buffer in place in that circumstance and flush it at the point of the next request. Complete signals that arrive while there is a buffer waiting are now delayed until it is flushed.
Also fixed is the potential for negative request amounts being sent upstream which is a trivial change to just not send those.
@peterspikings Please sign the Contributor License Agreement!
Click here to manually synchronize the status of this Pull Request.
See the FAQ for frequently asked questions.
@peterspikings Please sign the Contributor License Agreement!
Click here to manually synchronize the status of this Pull Request.
See the FAQ for frequently asked questions.
@peterspikings Thank you for your proposal! Can you please try the WindowTimeout version from #1099 which does a similar thing? If it is something you expect, then we will rework the whole set of buffer / window operators to make sure they are following that approach in an optimal way
No problem at all!
I tried the following unit test against the branch in PR #1099. It subscribes to a source which is slowly producing items. The test shows up two issues, if run as is then you get a mix of items going missing and being buffered it seems. With the source producing three items after 200ms, 400ms, 600ms, the windowTimeout with a duration of 100ms and demands of 1 at the start, 1 after 300ms and 1 after 1300ms I would expect [1] after 300ms, [2] after 500ms and [3] when putting the last demand in. Not sure why the empty list shows up after 100ms or where items 2 & 3 went!
Flux<List<Integer>> scenario_bufferWithTimeoutWhenDownstreamDemandIsLow() {
return Flux.range(1, 3)
.delayElements(Duration.ofMillis(200))
.log("Pre")
.windowTimeout(5, Duration.ofMillis(100))
.concatMap(Flux::collectList)
.log("Post");
}
@Test
public void bufferWithTimeoutWhenDownstreamDemandIsLow() {
StepVerifier.withVirtualTime(this::scenario_bufferWithTimeoutWhenDownstreamDemandIsLow,
1)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(100))
.assertNext(s -> assertThat(s).isEmpty())
.expectNoEvent(Duration.ofMillis(200))
.thenRequest(1)
.assertNext(s -> assertThat(s).containsExactly(1))
.expectNoEvent(Duration.ofMillis(1000))
.thenRequest(1)
.assertNext(s -> assertThat(s).containsExactly(2, 3)) // Fails here as next is empty list
.verifyComplete();
}
The other issue is that if you subscribe with zero demand then you immediately get an overflow error which makes no sense to me even though no items have been emitted from the source yet.
@peterspikings just FYI. I updated windowTimout versions and added a test as you proposed. There is a couple of issues In your original test:
- concatMap(Flux::collectList) does 256 elements request upfront so the demand from step verifier will not be the one you get in windowTimeout. (that can be fixed with setting concatMap(fn, prefetch = 0))
- ConcatMap is unideal. So it will take the next element almost immediately, regardless of the fact you demanded something or not. That said - the expectation will be slightly different but close enough to what you presented.
Feel free to have a look at the same PR mentioned above
Closing this PR as it seems to have been superseded by the implementation in #3332. Please open a new issue in case the implementation doesn't meet the expectations behind this proposal.