reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

BufferTimeout with fair backpressure rework

Open chemicL opened this issue 2 years ago • 3 comments

The fair backpressure variant of the bufferTimeout operator has been reworked to use a state machine with a minimum number of volatile variables eliminating potential data races, such as skipping the delivery when onNext and timeout happen concurrently or cancellation happens while onNext is delivered, etc.

Resolves #3531

chemicL avatar Nov 07 '23 12:11 chemicL

Although the above could be false negative, since the async task can be unawaited properly. Can you please doublecheck @chemicL

OlegDokuka avatar Nov 08 '23 15:11 OlegDokuka

I just remembered that it was one of the reasons why all the discard tests are in stress tests, therefore it could make sense to port part of them for bufferTimeout

OlegDokuka avatar Nov 08 '23 19:11 OlegDokuka

@OlegDokuka the commit 0334959 adds significant improvements to the test suite that helps catch the racy situations. In the next commit I will add temporary fixes for the identified issues, but afterwards will follow up with a state machine implementation to eliminate the last one with timeout racing with draining.

chemicL avatar Nov 22 '23 13:11 chemicL

For the latest changes, I added a JMH benchmark. The idea is to simply test a non-contended, single threaded case of pulling 100 items and packaging them in 1, 10, and 100-item buffers.

Devising a contended usage scenario with a concurrent, asymmetric JMH benchmark doesn't seem worthwhile as the dominating factor should never be two actors competing but rather one actor winning a potential race and performing the bulk of the work. We can't approach this performance evaluation the same way as a regular queue benchmark with a randezvous scenario of handing over between two threads. In the Queue benchmark case, this operation is the essence of the work being done. In contrast, with reactive-streams it is not the case as there is the idea of work stealing where one actor winning a race (which can contend in a lock-free manner for a short moment) is performing a drain operation and the Queue is a means to an end and we are not evaluating its behaviour under load, but the operator's algorithm itself.

Having said the above, I compared the new backpressured variant with the current (broken) implementation and also with the simpler, yet not-backpressured variant. Here are my results on an M1 MacBook Pro (10-core, 32GB):

This PR, fairBackpressure = true

Benchmark                             (bufferSize)  Mode  Cnt     Score     Error  Units
FluxBufferTimeoutBenchmark.oneByOne              1  avgt    5  6259.514 ± 214.272  ns/op
FluxBufferTimeoutBenchmark.oneByOne             10  avgt    5  4418.668 ± 595.467  ns/op
FluxBufferTimeoutBenchmark.oneByOne            100  avgt    5  3938.607 ± 518.551  ns/op
FluxBufferTimeoutBenchmark.unlimited             1  avgt    5  5764.457 ± 399.358  ns/op
FluxBufferTimeoutBenchmark.unlimited            10  avgt    5  4072.673 ± 169.380  ns/op
FluxBufferTimeoutBenchmark.unlimited           100  avgt    5  3797.930 ±  94.629  ns/op

Current 3.5.x, fairBackpressure = true

Benchmark                             (bufferSize)  Mode  Cnt      Score      Error  Units
FluxBufferTimeoutBenchmark.oneByOne              1  avgt    5  23836.157 ± 2138.635  ns/op
FluxBufferTimeoutBenchmark.oneByOne             10  avgt    5   8935.440 ±   43.610  ns/op
FluxBufferTimeoutBenchmark.oneByOne            100  avgt    5   7182.099 ±  129.899  ns/op
FluxBufferTimeoutBenchmark.unlimited             1  avgt    5  18836.570 ± 1281.322  ns/op
FluxBufferTimeoutBenchmark.unlimited            10  avgt    5   8231.299 ±   55.848  ns/op
FluxBufferTimeoutBenchmark.unlimited           100  avgt    5   7197.949 ±  160.759  ns/op

Current 3.5.x, fairBackpressure = false

Benchmark                             (bufferSize)  Mode  Cnt      Score      Error  Units
FluxBufferTimeoutBenchmark.oneByOne              1  avgt    5  18376.986 ±  795.211  ns/op
FluxBufferTimeoutBenchmark.oneByOne             10  avgt    5   3458.388 ±  200.223  ns/op
FluxBufferTimeoutBenchmark.oneByOne            100  avgt    5   2533.851 ±  440.186  ns/op
FluxBufferTimeoutBenchmark.unlimited             1  avgt    5  15716.505 ± 1168.297  ns/op
FluxBufferTimeoutBenchmark.unlimited            10  avgt    5   3040.167 ±  144.305  ns/op
FluxBufferTimeoutBenchmark.unlimited           100  avgt    5   2470.696 ±  375.516  ns/op

To summarize the above, the new implementation passes the JCStress tests devised to catch the discovered issues while maintaining a better performance characteristic. The non-prefetching and incapable of respecting backpressure variant is more performant and that's understood as it doesn't need to keep track of much accounting and is a simpler implementation.

chemicL avatar May 22 '24 15:05 chemicL

Thanks for the review @violetagg :)

chemicL avatar Jun 05 '24 13:06 chemicL