failsafe
failsafe copied to clipboard
Bulkhead(Executor) does not always release permits
I believe the root cause is how the FutureLinkedList is used in BulkheadImpl.
When trying to acquire a permit from the BulkheadImpl with a maxWaitTime, if there are no permits immediately available, a new Future will be added to the FutureLinkedList. When that future is completed, the Future is removed from the FutureLinkedList. If a permit is immediately available, a completed future is returned.
The returned future is completed in BulkheadImpl#releasePermit, so unless BulkheadImpl#releasePermit is called, the future is never removed from the FutureLinkedList. This causes a memory leak (The node is never removed from the list), and over time new permits cannot be issued.
This is a big problem when using a bulkhead in a FailsafeExecutor. Permits are acquired with a maxWaitTime in BulkheadExecutor#preExecute. If you submit more work to the executor than there are permits, the permits rejected due to BulkheadFullException are never released. When that happens, the bulkhead enters an irrecoverable, broken state. The permits are never released because in this case neither onSuccess or onFailure is called in BulkheadExecutor.
I wrote a small example showcasing how this bug can be triggered.
public static void main(String[] args) {
Bulkhead<Object> bulkhead = Bulkhead.builder(1) // Max concurrency of 1.
.withMaxWaitTime(Duration.ZERO)
.build();
FailsafeExecutor<Object> failsafe = Failsafe.with(bulkhead);
AtomicInteger numBulkheadFullErrors = new AtomicInteger();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 3; i++) { // Create 3 threads submitting work to the bulkhead
threads.add(new Thread(() -> {
for (int j = 0; j < 10; j++) {
try {
failsafe.get(() -> null); // Submit work to the bulkhead
} catch (BulkheadFullException e) {
numBulkheadFullErrors.incrementAndGet();
}
}
}));
}
// Start and join the threads
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Wait for the executor to finish all work. 250ms is plenty of time to finish the work submitted above.
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Expected behaviour: Program outputs "Success" with a few errors.
// Actual behaviour: Program outputs "Error: BulkheadFullException" with many errors (29 on my local machine).
try {
System.out.println("Success: " + failsafe.get(() -> true) + ", total during looping: " + numBulkheadFullErrors.get());
} catch (BulkheadFullException e) {
System.out.println("Error: BulkheadFullException, total during looping: " + numBulkheadFullErrors.get());
}
}
Note that this bug can also be replicated using code found in the documentation, if a maxWaitTime is provided to the tryAcquirePermit function. Internally, tryAcquirePermit may create a future, but this future is never released (because tryAcquirePermit returned false).
if (bulkhead.tryAcquirePermit()) {
try {
doSomething();
} finally {
bulkhead.releasePermit();
}
}