hivemq-mqtt-client-dotnet icon indicating copy to clipboard operation
hivemq-mqtt-client-dotnet copied to clipboard

HiveMQTT subscriber dies, never recovers, after receiving large burst of messages

Open Aaronontheweb opened this issue 1 year ago • 5 comments

🐛 Bug Report

I'm working on an MQTT PoC that needs to be able to process 30,000 3k-7kb packets per second per node (large scale network) with QoS=0. Tried doing this with MQTTNet and ran into issues with it https://github.com/dotnet/MQTTnet/issues/1962 - so I thought I would give HiveMQ a try.

Here is what I was able to produce with the same sample I used for MQTTnet:

[INFO][04/10/2024 22:04:14.925Z][Thread 0004][akka://MyActorSystem/user/supervisor] RabbitMQ supervisor started - MQTT reader and writer running
[INFO][04/10/2024 22:04:20.385Z][Thread 0029][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [0] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:20.401Z][Thread 0029][akka://MyActorSystem/user/supervisor/mqtt-reader] Subscription running
[INFO][04/10/2024 22:04:20.402Z][Thread 0029][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [0] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:24.950Z][Thread 0011][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [0] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:25.136Z][Thread 0019][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [0] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:29.949Z][Thread 0012][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [8164] messages from MQTT [1633.1718078937852 msg/s]
[INFO][04/10/2024 22:04:30.135Z][Thread 0022][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [9093] messages to Rabbit [1819.0071665641635 msg/s]
[INFO][04/10/2024 22:04:35.227Z][Thread 0077][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [4050.8370577319124 msg/s]
[INFO][04/10/2024 22:04:36.252Z][Thread 0012][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [25324] messages to Rabbit [2653.277448258188 msg/s]
[INFO][04/10/2024 22:04:39.946Z][Thread 0029][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:40.132Z][Thread 0044][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [1087.8009516041482 msg/s]
[INFO][04/10/2024 22:04:44.948Z][Thread 0078][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:45.135Z][Thread 0029][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:49.953Z][Thread 0015][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:50.138Z][Thread 0078][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:54.958Z][Thread 0015][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:04:55.127Z][Thread 0078][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:04:59.944Z][Thread 0078][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:05:00.130Z][Thread 0078][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:05:04.951Z][Thread 0015][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:05:05.137Z][Thread 0044][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:05:09.958Z][Thread 0044][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]
[INFO][04/10/2024 22:05:10.129Z][Thread 0044][akka://MyActorSystem/user/supervisor/rabbit-writer] Wrote [29544] messages to Rabbit [0 msg/s]
[INFO][04/10/2024 22:05:14.947Z][Thread 0078][akka://MyActorSystem/user/supervisor/mqtt-reader] Read [29544] messages from MQTT [0 msg/s]

Received about ~30k messages and then the client dies - no disconnect message or anything is received. The OnMessage event stops firing and according to EMQX, the client is still alive but it's no longer ACKing any of the published messages.

🔬 How To Reproduce

Steps to reproduce the behavior:

  1. ...

Code sample

Fairly simply client setup - we're just writing any of the messages we receive to a ChannelWriter<MQTT5PublishMessage> - those messages will get picked up by Akka.NET's Streaming library which will pipe the messages to RabbitMQ. Akka.Streams hasn't had any trouble keeping up.

private async Task ConfigureMqttClientAsync()
    {
        var mqttClientOptions = new HiveMQClientOptionsBuilder()
            .WithClientId(_appSetting.MQTT_ClientId)
            .WithBroker(_appSetting.MQTT_Host)
            .WithPort(_appSetting.MQTT_Port)
            .WithUserName(_appSetting.MQTT_User)
            .WithPassword(_appSetting.MQTT_Password)
            .WithCleanStart(false)
            .WithMaximumPacketSize(7 * 1024)
            //.WithUseTls(true)
            .Build();

        _mqttClient = new HiveMQClient(mqttClientOptions);

      var qosSetting = _appSetting.MQTT_QoS switch
        {
            0 => QualityOfService.AtMostOnceDelivery,
            1 => QualityOfService.AtLeastOnceDelivery,
            2 => QualityOfService.ExactlyOnceDelivery,
            _ => QualityOfService
                .AtMostOnceDelivery // Handle invalid QoS setting (assuming default to QoS 0 in this example)
        };

        var topicFilter = new TopicFilter(_appSetting.MQTT_Topic, qosSetting);

        var mqttSubscribeOptions = new SubscribeOptionsBuilder()
            .WithSubscription(topicFilter).Build();

        _mqttClient.OnMessageReceived += ManagedMqttClient_ApplicationMessageReceived2Async;
        _mqttClient.OnDisconnectReceived += ManagedMqttClient_DisconnectedAsync;
        _mqttClient.OnPublishReceived +=  (sender, args) =>
        {
            _self.Tell(ClientRecv.Instance);
        };
        
        var connectResult = await _mqttClient.ConnectAsync();
        await _mqttClient.SubscribeAsync(mqttSubscribeOptions);
        _log.Info("Subscription running");
    }

private void ManagedMqttClient_DisconnectedAsync(object? sender, OnDisconnectReceivedEventArgs e)
    {
        _log.Warning("MQTT client disconnected for reason: {0}", e.DisconnectPacket.DisconnectReasonCode);
        _mqttClient.Dispose(); // dispose the client
        _mqttClient.OnMessageReceived -= ManagedMqttClient_ApplicationMessageReceived2Async;
        _mqttClient.OnDisconnectReceived -= ManagedMqttClient_DisconnectedAsync;
        _self.Tell(ConfigureMqttClient.Instance);
    }

    private void ManagedMqttClient_ApplicationMessageReceived2Async(object? sender, OnMessageReceivedEventArgs e)
    {
        try
        {
            using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
            _channelWriter.WriteAsync(e.PublishMessage, cts.Token);
            _self.Tell(Processed.Instance);
        }
        catch (Exception ex)
        {
            _log.Error(ex, "Error acknowledging message {0}", e.PublishMessage.CorrelationData);
        }
    }

Environment

Windows, .NET 8, using an EMQX 5.5.1 broker running on Ubuntu WSL2

Screenshots

What I can see in the EMQX broker logs is that HiveMQTT fails to keep up not long after the load starts - notice the lack of PUBACK responses here:

2024-04-10T16:45:59.156931-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6712, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.156974-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6713, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.156993-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6714, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157038-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6715, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157064-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6716, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157083-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6717, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157101-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6718, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157120-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6719, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157136-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6720, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157156-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6721, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157176-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6722, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157195-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6723, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157212-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6724, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157253-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6725, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin
2024-04-10T16:45:59.157329-05:00 [MQTT] [email protected]:54600 msg: mqtt_packet_sent, packet: PUBLISH(Q1, R0, D0, Topic=dev/topic_1, PacketId=6726, Payload={"idRequest":"77444ea7-207f-4f4b-b2fe-4cff777e4efe","time":1660643404,"statusCode":"OK","errorMessag... The 1019 bytes of this log are truncated), username: admin

PUBACK events, eventually, do get fired back at the server but only after the message is no longer retained in the fixed-size buffer.

What I can't figure out is - _why doesn't my OnMessageReceived handler get fired when this is happening?

📈 Expected behavior

I'd expect the MQTT client to keep up with the events being thrown at it - the project should really add some event processing benchmarks; that's a much more important measure than event publishing throughput IMHO.

Aaronontheweb avatar Apr 10 '24 22:04 Aaronontheweb

Hello @Aaronontheweb, thanks for contributing to the HiveMQ community! We will respond as soon as possible.

github-actions[bot] avatar Apr 10 '24 22:04 github-actions[bot]

Hi @Aaronontheweb - I'm traveling today but will take a closer look at this tomorrow first thing. I agree on the benchmarks & event processing. This is planned soon next time I loop back to them.

pglombardo avatar Apr 11 '24 05:04 pglombardo

Hey @Aaronontheweb - The client should definitely not die - I'll aim to recreate this here locally and post back soon.

A couple of thoughts:

.WithMaximumPacketSize(7 * 1024)

This is unnecessary and probably only slows down/complicates the issue.

Considering that you've tried both this library and MQTTNet, have you tried swapping brokers just to identify any difference in either client behavior? That may provide some useful information.

If events processing seems like the issue, you could try merging the OnPublishReceived and OnMessageReceived handlers since they are essentially the same event just at different levels. But this would only be an efficiency change.

I'll post back here soon with updates.

pglombardo avatar Apr 12 '24 08:04 pglombardo

Hi @Aaronontheweb - have you made any progress on this issue?

I tested locally and haven't been able to reproduce this yet.

I wanted to get a base line so I used a tool to blast publish QoS 1 messages endlessly.

With this client/single subscriber, I got about ~10k msgs/second for processing. HiveMQ v4 broker in the middle though.

I recorded the session here:

https://asciinema.org/a/hGOgBiYMDWNgOu5uLse7M1wzf

With QoS 0 we peak out at about ~80k msgs/sec:

Screenshot 2024-04-15 at 16 55 37

In any case, I just wanted to update you. I have a couple more tests planned. Let me know if you've made any progress on your side.

pglombardo avatar Apr 15 '24 15:04 pglombardo

@pglombardo thanks! I'll retry this without the packet size setting

Aaronontheweb avatar Apr 15 '24 17:04 Aaronontheweb

Hi @Aaronontheweb - have you had a chance to revisit this? With v0.18.1, we've added a bunch of health checks and back-pressure support that should resolve this issue. Let me know.

pglombardo avatar Jun 11 '24 10:06 pglombardo

Hi @pglombardo - I ended up writing my own MQTT library in C# https://github.com/petabridge/TurboMqtt

Aaronontheweb avatar Jun 12 '24 23:06 Aaronontheweb

Excellent - that looks like a great client!

I'll close out this issue but If you ever need anything else, don't hesitate - I'd be happy to help out.

pglombardo avatar Jun 13 '24 07:06 pglombardo

Thank you!

Aaronontheweb avatar Jun 14 '24 16:06 Aaronontheweb