rabbitmq-java-client
rabbitmq-java-client copied to clipboard
Occasional leak of Thread/Channel instances and threads blocking indefinately
Describe the bug
We noticed that occasionally we have threads stuck forever with the sack that point to the RabbitMQ channel class. The stack is below. Generally there are several notes about this:
-
The the RPC timeout is not applied to the enqueueAsyncRpc part of the processing only to the IO part This makes it possible for the thread to wait forever for _activeRpc to clear which in this case never happens. i.e. the code below can loop forever disregarding the rpc timeout.
while(this._activeRpc != null) { try { this._channelLockCondition.await(); } catch (InterruptedException var7) { var2 = true; } }I guess the same rpc timeout note goes for any "lock" calls this._channelLock.lock() A thread can also get stuck there forever.
-
The code was recently refactored to use locks instead of the synchronized sections It's not clear how that should work in the multi-threaded environment (java memory model) with members like _activeRpc. They are not declared as volatile/atomic. How would memory barrier be enforced for multi-threaded access to the member?
-
Also the semantic of the at ChannelN.asyncCompletableRpc is async, so it should never block, but in fact it can and does block.
========================== Threads get stuck in this state indefinitely ================
java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000007a5c699d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:341) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block([email protected]/AbstractQueuedSynchronizer.java:506) at java.util.concurrent.ForkJoinPool.unmanagedBlock([email protected]/ForkJoinPool.java:3465) at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3436) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:1623) at com.rabbitmq.client.impl.AMQChannel.doEnqueueRpc at com.rabbitmq.client.impl.AMQChannel.enqueueAsyncRpc at com.rabbitmq.client.impl.AMQChannel.quiescingAsyncRpc at com.rabbitmq.client.impl.AMQChannel.asyncRpc at com.rabbitmq.client.impl.AMQChannel.privateAsyncRpc at com.rabbitmq.client.impl.AMQChannel.exnWrappingAsyncRpc at com.rabbitmq.client.impl.ChannelN.asyncCompletableRpc at reactor.rabbitmq.Sender.lambda$declareExchange$16 ...
Reproduction steps
It's not clear what is triggering the condition.
Expected behavior
Any invocation should respect the rpc timeout.
Additional context
The code is using 5.20.0 version of the java client.
Actually ReentrantLock provides the memory barrier semantics, so #2 should be OK then.
@a701440 the answer is very straightforward: channels are not meant to be shared between threads. Never for publishing, virtually never for synchronous operations that modify the topology (declaring queues, for example). Having multiple consumers on a shared channel can work fine if your consumers coordinate.
Now, _activeRpc and friends are not used on the publishing path so I wouldn't object to making access to it safer. But in general, it sounds like you may be doing something that is explicitly prohibited in RabbitMQ clients, by design.
If Channel(s) in the Java can not be used concurrently this should be really defined much better in the API documentation. In that case the issue moves to Reactor Rabbit library and it's Sender class. It has resource management channel mono, but does not seem to close the resource management channel in any way, so the implementation "implies" that the same channel should be re-used for all resource management operations. I assume that Sender/Receiver classes there are actually thread safe and should be shared.
@a701440 where specifically do you suggest that put an extra warning? On the Channel#basicPublish method? You know too well users usually do not read documentation before diving head-first into writing code.
@a701440 I am also not against making the (internal) RPC part safer since it should not affect the hot code paths of publishing and consumptions.
Would you like to contribute a version that is safer, e.g. using the 2nd option listed above, a ReentrantLock?
I'll have to find out what the company policy is regarding contributing code to the open source projects. I think I need to get an explicit permission. Generally, even if the Channel is not thread safe it would be nice to prevent the indefinite thread lock-up with the stack above using the provided rpc timeout value