disruptor icon indicating copy to clipboard operation
disruptor copied to clipboard

SequenceBarrier.waitForSequence sometimes doesn't respect BlockingWaitStrategy

Open tibco-jufernan opened this issue 1 year ago • 0 comments

Describe the bug With many producers and a blocking wait strategy, SequenceBarrier.waitForSequence(nextSequence) can return a value less than its input. Specifically, it can return nextSequence - 1.

To Reproduce

Run thisFails(). It calls testWithNumProducers, which fails if waitForSequence(nextSequence) returns a value less than its input.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;

class WaitForReproTest {

    @Test
    void thisFails() throws InterruptedException {
        testWithNumProducers(128);
    }

    @Test
    void thisWorks() throws InterruptedException {
        testWithNumProducers(3);
    }

    void testWithNumProducers(int numProducers) throws InterruptedException {
        WaitForRepro w = new WaitForRepro();

        w.startConsumer();
        w.startProducers(numProducers);

        Thread.sleep(4000);

        Assertions.assertFalse(w.failed());
    }

    static class WaitForRepro {

        private final RingBuffer<Integer> buffer;
        private final SequenceBarrier barrier;
        private volatile boolean failed;

        public WaitForRepro() {
            buffer = RingBuffer.createMultiProducer(() -> 2, 16, new BlockingWaitStrategy());
            barrier = buffer.newBarrier();
        }

        public void startConsumer() {
            Executors.newSingleThreadExecutor().submit(() -> {
                long nextSequence = 0;
                long availableSequence;
                while (true) {
                    availableSequence = barrier.waitFor(nextSequence);
                    if (nextSequence > availableSequence) {
                        fail();
                    }
                    while (nextSequence <= availableSequence) {
                        buffer.get(nextSequence++);
                    }
                    Thread.sleep(5);
                }
            });
        }

        public void startProducers(int numProducers) {
            ExecutorService executor = Executors.newFixedThreadPool(numProducers);
            for (int i = 0; i < numProducers; ++i) {
                executor.submit(() -> {
                    while (true) {
                        buffer.publishEvent((event, seq) -> {
                            // NOP
                        });
                        Thread.sleep(3);
                    }
                });
            }
        }

        private void fail() {
            failed = true;
        }

        public boolean failed() {
            return failed;
        }
    }
}

Expected behavior I expected that with a BlockingWaitStrategy, waitFor(nextSequence) should block until it can return nextSequence or a value greater than it.

Desktop (please complete the following information):

  • OS: MacOS
  • Version 3.4.4 and, 4.0.0
  • JVM Version OpenJDK Runtime Environment Temurin-17.0.8+7 (build 17.0.8+7)

Additional context I had opened a thread on Google Groups, but thought that this problem might be more easily addressed as a GitHub issue.

In 2.10.4, I did not observe this behavior.

When debugging, I found that the blocking wait strategy returns the a sequence greater than or equal to sequence on line 56 of waitFor. But on line 63, sequencer.getHighestPublishedSequence returns sequence - 1. https://github.com/LMAX-Exchange/disruptor/blob/3.4.4/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java#L51

In 2.10.4, waitFor simply defers to the wait strategy, so sequence - 1 can never be returned with a blocking wait strategy. https://github.com/LMAX-Exchange/disruptor/blob/2.10.4/code/src/main/com/lmax/disruptor/ProcessingSequenceBarrier.java#L40

tibco-jufernan avatar Oct 05 '23 16:10 tibco-jufernan