buffer-trigger
buffer-trigger copied to clipboard
SimpleBufferTrigger在自定义了拒绝方法后,高并发的情况下,会导致有些元素既没有加入到容器中,也没有被拒绝
可以稳定复现的场景是这样的:
public class TestTrigger {
private AtomicLong enqueueCount = new AtomicLong();
private AtomicLong consumeCount = new AtomicLong();
private AtomicLong rejectCount = new AtomicLong();
private BufferTrigger<String> buffer = BufferTrigger.<String, Queue<String>>simple()
.name("test-trigger")
.setContainer(ConcurrentLinkedQueue::new, Queue::add)
.maxBufferCount(1000)
.interval(1, TimeUnit.SECONDS)
.consumer(this::doBatchReload)
.rejectHandler(this::onTaskRejected)
.build();
private void doBatchReload(Iterable<String> values) {
consumeCount.addAndGet(Iterables.size(values));
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1000));
}
private void onTaskRejected(String value) {
rejectCount.addAndGet(1);
}
private void test() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i = 0; i < 1000000; i++) {
executor.submit(() -> {
enqueueCount.getAndAdd(1);
buffer.enqueue("test");
});
if (i % 353 == 0) {
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(50));
}
}
executor.shutdown();
boolean finished = executor.awaitTermination(30, TimeUnit.SECONDS);
System.out.println(finished);
buffer.manuallyDoTrigger();
System.out.printf("enqueued: %d\n", enqueueCount.get());
System.out.printf("handled: %d + %d = %d\n", consumeCount.get(), rejectCount.get(), consumeCount.get() + rejectCount.get());
}
public static void main(String[] args) throws InterruptedException {
TestTrigger test = new TestTrigger();
test.test();
}
}
结果是:
true
enqueued: 1000000
handled: 150023 + 849973 = 999996
SimpleBufferTrigger#enqueue(138行)
@Override
public void enqueue(E element) {
checkState(!shutdown, "buffer trigger was shutdown.");
long currentCount = counter.get();
long thisMaxBufferCount = maxBufferCount.getAsLong();
if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
//原因是这里初始的值是false,在多个线程竞争时,有个线程获取锁后,并没有被执行到拒绝策略,但状态没有变化,仍被返回了
//初始化为true时,不再有问题
boolean pass = false;
if (rejectHandler != null) {
if (writeLock != null && writeCondition != null) {
writeLock.lock(); // 这里采用 DCL,是为了避免部分消费情况下没有 signalAll 唤醒,导致的卡死问题
// 判断堵塞的条件也的确应该在锁块内保护,之前的代码在临界区(counter)的保护上是有缺陷的
}
try {
currentCount = counter.get();
thisMaxBufferCount = maxBufferCount.getAsLong();
if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
pass = fireRejectHandler(element);
}
} finally {
if (writeLock != null && writeCondition != null) {
writeLock.unlock();
}
}
}
if (!pass) {
return;
}
}