MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

NullReferenceException in AcknowledgeReceivedPublishPacket

Open BEagle1984 opened this issue 2 years ago • 2 comments

Describe the bug

Sometimes a NullReferenceException is thrown in the AcknowledgeReceivedPublishPacket method.

Which component is your bug related to?

  • Client

To Reproduce

I'm not sure how to reproduce it and the issue is also kinda difficult to debug (it only ever happened on production...of course 🙄).

Basically the AcknwoledgeReceivedPublishPacket is sometimes (very rarely) throwing a NullReferenceException.

Expected behavior

The AcknowledgeAsync method should never fail (but for a communication issue with the broker).

Additional context / logging

We are using emqx as broker, with QoS=1 and we do use shared subscriptions.

This is the stack trace:

System.NullReferenceException: Object reference not set to an instance of an object. at MQTTnet.Client.MqttClient.AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs eventArgs, CancellationToken cancellationToken) at Silverback.Messaging.Broker.Mqtt.ConsumerChannelManager.HandleMessageAsync(ConsumedApplicationMessage consumedMessage) at Silverback.Messaging.Broker.Mqtt.ConsumerChannelManager.<>c__DisplayClass22_0.<<ReadChannelOnceAsync>b__0>d.MoveNext()"

Code example

What I'm doing is pretty simple:

// In the message handler I simply wrap the event args and write them into a queue
public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{
    var receivedMessage = new ConsumedApplicationMessage(eventArgs);

    eventArgs.AutoAcknowledge = false;
    await _channel.Writer.WriteAsync(receivedMessage).ConfigureAwait(false);
}

// After I process the message from the queue I call the `AcknowledgeAsync` method
private async Task HandleMessageAsync(ConsumedApplicationMessage consumedMessage)
{
    // Retry locally until successfully processed (or skipped)
    while (!_readCancellationTokenSource.Token.IsCancellationRequested)
    {
        await _mqttClientWrapper.HandleMessageAsync(consumedMessage).ConfigureAwait(false);

        if (await consumedMessage.TaskCompletionSource.Task.ConfigureAwait(false))
        {
            await consumedMessage.EventArgs.AcknowledgeAsync(_readCancellationTokenSource.Token).ConfigureAwait(false);
            break;
        }

        consumedMessage.TaskCompletionSource = new TaskCompletionSource<bool>();
    }
}

BEagle1984 avatar Jul 17 '23 07:07 BEagle1984

I figured out that this could happen because the client got disconnected right after the message got received and before he could acknowledge it. If that's the case, I would suggest throwing a dedicated and specific exception. It would be more elegant and make it easier to understand what's going on.

BEagle1984 avatar Jul 17 '23 11:07 BEagle1984

I have similar experience when using RpcClient when I try to do RPC call and forget to call connect first.

Reproducer

        [TestMethod]
        public async Task Test_Rpc_Client_Not_Connected()
        {
            var factory = new MqttFactory();
            using (var client = factory.CreateMqttClient())
            {
                var rpcClient = factory.CreateMqttRpcClient(client);
                await rpcClient.ExecuteAsync("aaa", new byte[] {1, 2, 3}, MqttQualityOfServiceLevel.AtLeastOnce);
            }
        }

image

Temppus avatar Jul 19 '23 11:07 Temppus