buffer-trigger icon indicating copy to clipboard operation
buffer-trigger copied to clipboard

SimpleBufferTrigger在自定义了拒绝方法后,高并发的情况下,会导致有些元素既没有加入到容器中,也没有被拒绝

Open lionheartdong opened this issue 2 years ago • 1 comments

可以稳定复现的场景是这样的:

public class TestTrigger {
    private AtomicLong enqueueCount = new AtomicLong();
    private AtomicLong consumeCount = new AtomicLong();
    private AtomicLong rejectCount = new AtomicLong();

    private BufferTrigger<String> buffer = BufferTrigger.<String, Queue<String>>simple()
            .name("test-trigger")
            .setContainer(ConcurrentLinkedQueue::new, Queue::add)
            .maxBufferCount(1000)
            .interval(1, TimeUnit.SECONDS)
            .consumer(this::doBatchReload)
            .rejectHandler(this::onTaskRejected)
            .build();


    private void doBatchReload(Iterable<String> values) {
        consumeCount.addAndGet(Iterables.size(values));
        Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1000));
    }

    private void onTaskRejected(String value) {
        rejectCount.addAndGet(1);
    }

    private void test() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(100);

        for (int i = 0; i < 1000000; i++) {
            executor.submit(() -> {
                enqueueCount.getAndAdd(1);
                buffer.enqueue("test");
            });
            if (i % 353 == 0) {
                Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(50));
            }
        }

        executor.shutdown();
        boolean finished = executor.awaitTermination(30, TimeUnit.SECONDS);
        System.out.println(finished);
        buffer.manuallyDoTrigger();
        System.out.printf("enqueued: %d\n", enqueueCount.get());
        System.out.printf("handled: %d + %d = %d\n", consumeCount.get(), rejectCount.get(), consumeCount.get() + rejectCount.get());
    }

    public static void main(String[] args) throws InterruptedException {
        TestTrigger test = new TestTrigger();
        test.test();
    }
}

结果是:

true
enqueued: 1000000
handled: 150023 + 849973 = 999996

lionheartdong avatar Jul 29 '22 07:07 lionheartdong

SimpleBufferTrigger#enqueue(138行)

    @Override
    public void enqueue(E element) {
        checkState(!shutdown, "buffer trigger was shutdown.");

        long currentCount = counter.get();
        long thisMaxBufferCount = maxBufferCount.getAsLong();
        if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
           //原因是这里初始的值是false,在多个线程竞争时,有个线程获取锁后,并没有被执行到拒绝策略,但状态没有变化,仍被返回了
           //初始化为true时,不再有问题
            boolean pass = false;
            if (rejectHandler != null) {
                if (writeLock != null && writeCondition != null) {
                    writeLock.lock(); // 这里采用 DCL,是为了避免部分消费情况下没有 signalAll 唤醒,导致的卡死问题
                    // 判断堵塞的条件也的确应该在锁块内保护,之前的代码在临界区(counter)的保护上是有缺陷的
                }
                try {
                    currentCount = counter.get();
                    thisMaxBufferCount = maxBufferCount.getAsLong();
                    if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
                        pass = fireRejectHandler(element);
                    }
                } finally {
                    if (writeLock != null && writeCondition != null) {
                        writeLock.unlock();
                    }
                }
            }
            if (!pass) {
                return;
            }
        }

lionheartdong avatar Jul 29 '22 07:07 lionheartdong