disruptor
disruptor copied to clipboard
SequenceBarrier.waitForSequence sometimes doesn't respect BlockingWaitStrategy
Describe the bug
With many producers and a blocking wait strategy, SequenceBarrier.waitForSequence(nextSequence)
can return a value less than its input. Specifically, it can return nextSequence - 1
.
To Reproduce
Run thisFails()
. It calls testWithNumProducers
, which fails if waitForSequence(nextSequence)
returns a value less than its input.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
class WaitForReproTest {
@Test
void thisFails() throws InterruptedException {
testWithNumProducers(128);
}
@Test
void thisWorks() throws InterruptedException {
testWithNumProducers(3);
}
void testWithNumProducers(int numProducers) throws InterruptedException {
WaitForRepro w = new WaitForRepro();
w.startConsumer();
w.startProducers(numProducers);
Thread.sleep(4000);
Assertions.assertFalse(w.failed());
}
static class WaitForRepro {
private final RingBuffer<Integer> buffer;
private final SequenceBarrier barrier;
private volatile boolean failed;
public WaitForRepro() {
buffer = RingBuffer.createMultiProducer(() -> 2, 16, new BlockingWaitStrategy());
barrier = buffer.newBarrier();
}
public void startConsumer() {
Executors.newSingleThreadExecutor().submit(() -> {
long nextSequence = 0;
long availableSequence;
while (true) {
availableSequence = barrier.waitFor(nextSequence);
if (nextSequence > availableSequence) {
fail();
}
while (nextSequence <= availableSequence) {
buffer.get(nextSequence++);
}
Thread.sleep(5);
}
});
}
public void startProducers(int numProducers) {
ExecutorService executor = Executors.newFixedThreadPool(numProducers);
for (int i = 0; i < numProducers; ++i) {
executor.submit(() -> {
while (true) {
buffer.publishEvent((event, seq) -> {
// NOP
});
Thread.sleep(3);
}
});
}
}
private void fail() {
failed = true;
}
public boolean failed() {
return failed;
}
}
}
Expected behavior
I expected that with a BlockingWaitStrategy, waitFor(nextSequence)
should block until it can return nextSequence
or a value greater than it.
Desktop (please complete the following information):
- OS: MacOS
- Version 3.4.4 and, 4.0.0
- JVM Version OpenJDK Runtime Environment Temurin-17.0.8+7 (build 17.0.8+7)
Additional context I had opened a thread on Google Groups, but thought that this problem might be more easily addressed as a GitHub issue.
In 2.10.4, I did not observe this behavior.
When debugging, I found that the blocking wait strategy returns the a sequence greater than or equal to sequence on line 56 of waitFor. But on line 63, sequencer.getHighestPublishedSequence returns sequence - 1. https://github.com/LMAX-Exchange/disruptor/blob/3.4.4/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java#L51
In 2.10.4, waitFor simply defers to the wait strategy, so sequence - 1 can never be returned with a blocking wait strategy. https://github.com/LMAX-Exchange/disruptor/blob/2.10.4/code/src/main/com/lmax/disruptor/ProcessingSequenceBarrier.java#L40