AsyncLock.WaitAsync getting in a dead lock
Describe the bug
When there are multiple threads publishing and there are communication issues the PublishAsync method locks up. Specifically, in the AsyncLock.WaitAsync on the task.ContinueWith_, state) => (IDisposable)state line.
Which component is your bug related to?
- Client
To Reproduce
Steps to reproduce the behavior:
- Using this version of MQTTnet '4.1.0.247'.
- Run the below example code (make sure you change the MQTT Broker Info):
- Using clumsy 0.3 (https://jagt.github.io/clumsy/download) set the following and run for a few minutes until the issue occurs: Filter: tcp and (tcp.DstPort == 1883 or tcp.SrcPort == 1883) Check lag and set to 100ms Check drop at 20%
- I usually see an exception like this: MQTTnet.Implementations.MqttTcpChannel+<WriteAsync>d__21) MQTTnet.Exceptions.MqttCommunicationException The TCP connection is closed first, then I am able to monitor the threads to ensure they are still running. When I notice a thread stops I am able to use parallel stacks to see where it is stuck.
Expected behavior
The publish should not indefinitely lock
Screenshots

Additional context / logging
Add any other context about the problem here. Include debugging or logging information here:
\\ Put your logging output here.
Code example
Please provide full code examples below where possible to make it easier for the developers to check your issues.
Ideally a Unit Test (which shows the error) is provided so that the behavior can be reproduced easily.
//full program
using MQTTnet;
using MQTTnet.Client;
Console.WriteLine("MQTT TCP Port Closed Issue");
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.DisconnectedAsync += MqttClientDisconnectedAsync;
Task MqttClientDisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine(arg.Exception);
return Task.CompletedTask;
}
var options = new MqttClientOptionsBuilder()
.WithClientId("MqttHoser")
.WithTcpServer("192.168.68.100")
.WithCleanSession()
.Build();
var result = await mqttClient.ConnectAsync(options, CancellationToken.None);
_ = Task.Run(async () =>
{
while (true)
{
try
{
if (!mqttClient.IsConnected)
{
Console.WriteLine("Reconnecting");
await mqttClient.ConnectAsync(options, CancellationToken.None);
}
}
catch (Exception)
{
//ignore
}
await Task.Delay(2000);
}
});
TaskScheduler.UnobservedTaskException += (s, e) => { e.SetObserved(); };
for (var j = 0; j < 3; j++)
{
var thisThread = j;
_ = Task.Run(async () =>
{
var message = new MqttApplicationMessageBuilder()
.WithTopic($"spBv1.0/cpo01tst/NCMD/{thisThread}")
.WithPayload("TestMe")
.Build();
while (true)
{
for (var i = 0; i < 10; i++)
{
try
{
await mqttClient.PublishAsync(message, CancellationToken.None);
}
catch (Exception e)
{
}
}
await Task.Delay(100);
Console.WriteLine($"Thread still running {thisThread}");
}
});
}
Console.ReadLine();
Does using ConfigureAwait(false) after PublishAsync make any difference?
Here is a unit test that demonstrates the issue. What I think is happening is that when the connection drops, the AsyncLock is disposed while the publish threads are still using them. This causes the task to become stuck:
[TestMethod]
public void Lock_10_Parallel_Tasks_With_Dispose_Doesnt_Lockup()
{
const int ThreadsCount = 10;
var threads = new Task[ThreadsCount];
var @lock = new AsyncLock();
var globalI = 0;
for (var i = 0; i < ThreadsCount; i++)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
threads[i] = Task.Run(
async () =>
{
using (var releaser = await @lock.WaitAsync(CancellationToken.None))
{
var localI = globalI;
await Task.Delay(10); // Increase the chance for wrong data.
localI++;
globalI = localI;
}
}).ContinueWith(
x =>
{
if (globalI == 5)
{
@lock.Dispose();
@lock = new AsyncLock();
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
Task.WaitAll(threads);
Assert.AreEqual(ThreadsCount, globalI);
}
@logicaloud
Does using
ConfigureAwait(false)afterPublishAsyncmake any difference?
I tried this, but it made no difference.
So it looks like disposing of the SemaphoreSlim when the AsyncLock.WaitAsync method is using it in the task is the problem. When it is disposed the task is not able to complete and the system hangs.
I have not been able to re-produce the problem without the clumsy tool, i.e. by introducing some artificial delays in the publishing functions. Dispose on the semaphore is not thread safe and should only be called once the semaphore is not used elsewhere, so that would point to the code that calls Dispose before all threads are done with it.
I wonder, would you be able to modify the Releaser in AsyncLock.cs slightly and run with your setup to see if and from where this InvalidOperationException is hit in the Dispose?
sealed class Releaser : IDisposable
{
readonly AsyncLock _lock;
internal Releaser(AsyncLock @lock)
{
_lock = @lock;
}
public void Dispose()
{
lock (_lock._syncRoot)
{
if (_lock._semaphore == null)
{
throw new InvalidOperationException();
}
}
_lock.Release();
}
}
@logicaloud
I am hitting the exception

I did attempt to make the below change with the idea that we wait until everything is done with the semaphor before disposing. It seems to work somewhat better, but in my testing yesterday I thought I was still having some issues. With this change, it does pass my unit test above.
Were you able to run the unit test above to see that the threads don't finish or did it pass for you?
public void Dispose()
{
_semaphore?.Wait();
lock (_syncRoot)
{
_semaphore?.Dispose();
_semaphore = null;
}
}
After my change sometimes it gets hung on _semaphor?.Wait() and I am unsure why

Modifying the unit test to dispose multiple times show the issue of getting stuck on the wait in dispose
[TestMethod]
public void Lock_10_Parallel_Tasks_With_Dispose_Doesnt_Lockup()
{
const int ThreadsCount = 10;
var threads = new Task[ThreadsCount];
var @lock = new AsyncLock();
var globalI = 0;
for (var i = 0; i < ThreadsCount; i++)
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
threads[i] = Task.Run(
async () =>
{
using (var releaser = await @lock.WaitAsync(CancellationToken.None))
{
var localI = globalI;
await Task.Delay(10); // Increase the chance for wrong data.
localI++;
globalI = localI;
}
}).ContinueWith(
x =>
{
if (globalI % 2 == 0)
{
@lock.Dispose();
@lock = new AsyncLock();
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
Task.WaitAll(threads);
Assert.AreEqual(ThreadsCount, globalI);
}
Thank you for the info; I think that's enough to identify the problem.
I can repeat the deadlock with the unit tests but this just demonstrates that Dispose should not be called on a semaphore that other threads are waiting on.
Here is what I think is happening: The stack trace shows that MqttTcpChannel.WriteAsync runs into the problem. That means that, by the time WriteAsync wants to release the semaphore, the semaphore is already disposed. The MqttTcpChannel is owned by the MqttChannelAdapter which in turn owns the AsyncLock. The channel adapter is owned by the MqttClient, called _adapter. When the client disconnects then it waits for all tasks to terminate (receiverTask, publishPacketReceiverTask and keepAliveTask) but it does not wait for any current WriteAsync operations to complete, which means that the client may be disposed which in turn disposes the _adapter which in turn disposes the semaphore before WriteAsync attempts to release the semaphore. - Does that make sense?
I didn't have the time yet to confirm the above scenario. If that is indeed the problem then a remedy would be to wait for any pending SendPacketAsync (and by inference, WriteAsync) to complete in MqttClient.DisconnectCoreAsync, after Task.WhenAll.... In that case an additional cancellation token in the channel adaptor would be helpful so that the WriteAsync can terminate quickly when the client disconnects (i.e. via CreateLinkedTokenSource). I have not given it too much thought at this stage; the suspected problem scenario would need to be confirmed first. - @chkr1011, any thoughts?
PS: The MqttClient also disconnects the adapter which then closes the underlying stream; I assume that means the WriteAsync should terminate quickly after this without the need for additional cancellation tokens or CreateLinkedTokenSource.
I am playing around with the Unit Test and had no luck fixing it. So, I decided to write a new async lock from scratch which works similar to the SemaphoreSlim internally. The new implementation works well but I still have to investigate performance and memory consumption.
The branch is Improve-AsyncLock. It contains 3 implementations for the AsyncLock. New new one (AsyncLock) and the old ones (AsyncLockOld, AsyncLockOldFixed). I tried to fix the issue with an additional cancellation token, but it did not work.
I will go ahead with performance tests etc. Please share your thoughts about the implementation.
@chkr1011 thank you for looking at this. It has been bugging me for about a week now. We are seeing this issue in production. I checked out your branch and it is passing my real world torture test. The implementation is impressive; I don't have any feedback on it.
@marktoddgea I optimized the performance and memory usage a lot so that in comparison, the new implementation is a little better in memory and CPU usage (compared with the original implementation with the dead lock). See benchmark:

Please test this version again and let me know if you experience performance issues etc. If not, I will consider adding this fix to the next release.
For what it's worth, I have been unable to deterministically produce any scenario that would cause an issue with the old AsyncLock, i.e. wait on a disposed semaphore, so still not sure what caused the original issue. Thanks for finding a solution, @chkr1011, good to have this fixed!
@chkr1011 I pulled the latest on the branch and everything seems to be working well. Thank you!
The changes are merged and will be part of the next release.