BufferTimeout with fair backpressure rework
The fair backpressure variant of the bufferTimeout operator has been reworked to use a state machine with a minimum number of volatile variables eliminating potential data races, such as skipping the delivery when onNext and timeout happen concurrently or cancellation happens while onNext is delivered, etc.
Resolves #3531
Although the above could be false negative, since the async task can be unawaited properly. Can you please doublecheck @chemicL
I just remembered that it was one of the reasons why all the discard tests are in stress tests, therefore it could make sense to port part of them for bufferTimeout
@OlegDokuka the commit 0334959 adds significant improvements to the test suite that helps catch the racy situations. In the next commit I will add temporary fixes for the identified issues, but afterwards will follow up with a state machine implementation to eliminate the last one with timeout racing with draining.
For the latest changes, I added a JMH benchmark. The idea is to simply test a non-contended, single threaded case of pulling 100 items and packaging them in 1, 10, and 100-item buffers.
Devising a contended usage scenario with a concurrent, asymmetric JMH benchmark doesn't seem worthwhile as the dominating factor should never be two actors competing but rather one actor winning a potential race and performing the bulk of the work. We can't approach this performance evaluation the same way as a regular queue benchmark with a randezvous scenario of handing over between two threads. In the Queue benchmark case, this operation is the essence of the work being done. In contrast, with reactive-streams it is not the case as there is the idea of work stealing where one actor winning a race (which can contend in a lock-free manner for a short moment) is performing a drain operation and the Queue is a means to an end and we are not evaluating its behaviour under load, but the operator's algorithm itself.
Having said the above, I compared the new backpressured variant with the current (broken) implementation and also with the simpler, yet not-backpressured variant. Here are my results on an M1 MacBook Pro (10-core, 32GB):
This PR, fairBackpressure = true
Benchmark (bufferSize) Mode Cnt Score Error Units
FluxBufferTimeoutBenchmark.oneByOne 1 avgt 5 6259.514 ± 214.272 ns/op
FluxBufferTimeoutBenchmark.oneByOne 10 avgt 5 4418.668 ± 595.467 ns/op
FluxBufferTimeoutBenchmark.oneByOne 100 avgt 5 3938.607 ± 518.551 ns/op
FluxBufferTimeoutBenchmark.unlimited 1 avgt 5 5764.457 ± 399.358 ns/op
FluxBufferTimeoutBenchmark.unlimited 10 avgt 5 4072.673 ± 169.380 ns/op
FluxBufferTimeoutBenchmark.unlimited 100 avgt 5 3797.930 ± 94.629 ns/op
Current 3.5.x, fairBackpressure = true
Benchmark (bufferSize) Mode Cnt Score Error Units
FluxBufferTimeoutBenchmark.oneByOne 1 avgt 5 23836.157 ± 2138.635 ns/op
FluxBufferTimeoutBenchmark.oneByOne 10 avgt 5 8935.440 ± 43.610 ns/op
FluxBufferTimeoutBenchmark.oneByOne 100 avgt 5 7182.099 ± 129.899 ns/op
FluxBufferTimeoutBenchmark.unlimited 1 avgt 5 18836.570 ± 1281.322 ns/op
FluxBufferTimeoutBenchmark.unlimited 10 avgt 5 8231.299 ± 55.848 ns/op
FluxBufferTimeoutBenchmark.unlimited 100 avgt 5 7197.949 ± 160.759 ns/op
Current 3.5.x, fairBackpressure = false
Benchmark (bufferSize) Mode Cnt Score Error Units
FluxBufferTimeoutBenchmark.oneByOne 1 avgt 5 18376.986 ± 795.211 ns/op
FluxBufferTimeoutBenchmark.oneByOne 10 avgt 5 3458.388 ± 200.223 ns/op
FluxBufferTimeoutBenchmark.oneByOne 100 avgt 5 2533.851 ± 440.186 ns/op
FluxBufferTimeoutBenchmark.unlimited 1 avgt 5 15716.505 ± 1168.297 ns/op
FluxBufferTimeoutBenchmark.unlimited 10 avgt 5 3040.167 ± 144.305 ns/op
FluxBufferTimeoutBenchmark.unlimited 100 avgt 5 2470.696 ± 375.516 ns/op
To summarize the above, the new implementation passes the JCStress tests devised to catch the discovered issues while maintaining a better performance characteristic. The non-prefetching and incapable of respecting backpressure variant is more performant and that's understood as it doesn't need to keep track of much accounting and is a simpler implementation.
Thanks for the review @violetagg :)