NullReferenceException in AcknowledgeReceivedPublishPacket
Describe the bug
Sometimes a NullReferenceException is thrown in the AcknowledgeReceivedPublishPacket method.
Which component is your bug related to?
- Client
To Reproduce
I'm not sure how to reproduce it and the issue is also kinda difficult to debug (it only ever happened on production...of course 🙄).
Basically the AcknwoledgeReceivedPublishPacket is sometimes (very rarely) throwing a NullReferenceException.
Expected behavior
The AcknowledgeAsync method should never fail (but for a communication issue with the broker).
Additional context / logging
We are using emqx as broker, with QoS=1 and we do use shared subscriptions.
This is the stack trace:
System.NullReferenceException: Object reference not set to an instance of an object. at MQTTnet.Client.MqttClient.AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs, CancellationToken cancellationToken) at Silverback.Messaging.Broker.Mqtt.ConsumerChannelManager.HandleMessageAsync(ConsumedApplicationMessage consumedMessage) at Silverback.Messaging.Broker.Mqtt.ConsumerChannelManager.<>c__DisplayClass22_0.<<ReadChannelOnceAsync>b__0>d.MoveNext()"
Code example
What I'm doing is pretty simple:
// In the message handler I simply wrap the event args and write them into a queue
public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var receivedMessage = new ConsumedApplicationMessage(eventArgs);
eventArgs.AutoAcknowledge = false;
await _channel.Writer.WriteAsync(receivedMessage).ConfigureAwait(false);
}
// After I process the message from the queue I call the `AcknowledgeAsync` method
private async Task HandleMessageAsync(ConsumedApplicationMessage consumedMessage)
{
// Retry locally until successfully processed (or skipped)
while (!_readCancellationTokenSource.Token.IsCancellationRequested)
{
await _mqttClientWrapper.HandleMessageAsync(consumedMessage).ConfigureAwait(false);
if (await consumedMessage.TaskCompletionSource.Task.ConfigureAwait(false))
{
await consumedMessage.EventArgs.AcknowledgeAsync(_readCancellationTokenSource.Token).ConfigureAwait(false);
break;
}
consumedMessage.TaskCompletionSource = new TaskCompletionSource<bool>();
}
}
I figured out that this could happen because the client got disconnected right after the message got received and before he could acknowledge it. If that's the case, I would suggest throwing a dedicated and specific exception. It would be more elegant and make it easier to understand what's going on.
I have similar experience when using RpcClient when I try to do RPC call and forget to call connect first.
Reproducer
[TestMethod]
public async Task Test_Rpc_Client_Not_Connected()
{
var factory = new MqttFactory();
using (var client = factory.CreateMqttClient())
{
var rpcClient = factory.CreateMqttRpcClient(client);
await rpcClient.ExecuteAsync("aaa", new byte[] {1, 2, 3}, MqttQualityOfServiceLevel.AtLeastOnce);
}
}