MQTTnet
MQTTnet copied to clipboard
Maximum message / s throughput for a single subscriber can't stay above 10000 m/s
Verification
Using version 4.3.3.952 in Release mode on .NET 8 with logging disabled.
Describe the bug
I'm currently running a small PoC using Akka.NET and MQTTnet via an EMQX broker using the following settings:
_mqttClient = _mqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithClientId(_appSetting.MQTT_ClientId)
.WithTcpServer(_appSetting.MQTT_Host, _appSetting.MQTT_Port)
.WithCredentials(_appSetting.MQTT_User, _appSetting.MQTT_Password)
.WithRequestProblemInformation(true)
.WithCleanSession()
.WithoutPacketFragmentation()
.Build();
var connectResult = await _mqttClient.ConnectAsync(mqttClientOptions);
if(connectResult.ResultCode != MqttClientConnectResultCode.Success)
{
// force a restart and retry
_log.Error("Failed to connect to MQTT broker: {0}", connectResult.ResultCode);
throw new InvalidOperationException("Failed to connect to MQTT broker");
}
var mqttSubscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic(_appSetting.MQTT_Topic);
// Conditionally set QoS based on AppSetting.MQTT_QoS
switch (_appSetting.MQTT_QoS)
{
case 0:
f.WithAtMostOnceQoS();
break;
case 1:
f.WithAtLeastOnceQoS();
break;
case 2:
f.WithExactlyOnceQoS();
break;
default:
// Handle invalid QoS setting (assuming default to QoS 0 in this example)
f.WithAtMostOnceQoS();
break;
}
}).Build();
_mqttClient.ApplicationMessageReceivedAsync += ManagedMqttClient_ApplicationMessageReceivedAsync;
await _mqttClient.SubscribeAsync(mqttSubscribeOptions);
_mqttClient.DisconnectedAsync += ManagedMqttClient_DisconnectedAsync;
And the following event handler - which just writes to an unbounded ChannelWriter<T>
:
private async Task ManagedMqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await e.AcknowledgeAsync(cts.Token);
await _channelWriter.WriteAsync(e.ApplicationMessage, cts.Token);
_self.Tell(Processed.Instance);
}
catch (Exception ex)
{
_log.Error(ex, "Error acknowledging message {0}", e.PacketIdentifier);
}
}
I'm seeing MQTTnet fail to keep up with my workload starting around 10,000 msg/s - and gradually it will fail as the queue gets too large on EMQX (the broker will automatically disconnect the client.)
Even if I change my ManagedMqttClient_ApplicationMessageReceivedAsync
to just return a Task.CompletedTask
and not do any actual processing, I still see this issue: MQTTnet can't keep up with the rate of messages coming from the broker.
For the project I'm working on each subscriber in an MQTT subscriber group will need to be able to process ~30,000 msg/s - is that possible with this library?
Which component is your bug related to?
- Client
- ManagedClient
Tried with both the Client
and the ManagedClient
but it didn't seem to make any difference.
Expected behavior
I'd expect MQTTnet to keep up beyond 10k msg/s.
Additional context / logging
These are some of the logs from EMQX - it just gives up on the client because the input queue gets too long:
2024-04-04T13:28:24.495093-05:00 [SOCKET] [email protected]:62157 msg: emqx_connection_terminated, reason: {shutdown,#{max => 1000,reason => message_queue_too_long,value => 13381}}, username: admin
This ends up creating a snowballing effect where the client is never really able to keep up.
I increased some of EMQX's broker settings to allow up to 10,000 inflight messages at any given time and I increased the queue size to 1m messages per MQTT topic. Still seeing things top out around 10k msg/s even after adjusting that.
Can you profile your client side where the most time in the code is spent? How big are the messages that you are receiving from your EMQX instance? Is the EMQX broker on the same network as your system or in the web? Is any other MQTT client able to keep up with your demand on your system?
Hi @Aaronontheweb, We had the same issue with MQTTNet but our payloads were much larger. We completely gave up on MQTTNet library altogether at a point. But then came across this https://github.com/dotnet/MQTTnet/issues/1160. Even though your payload size is only 10KB but from memory I remember looking into the MQTTNet code and the default buffer for TCP server is much smaller. I would suggest increasing the buffer size as mentioned in the linked issue and see if it helps.
Great advice - thank you
@bajinder so this did not help me - pumped the receive buffer up from the default of 8kb to 1mb. Still petered out around the same point.
Tried doing some work using the LowLevelClient
as well - the numbers look a tad better there but I haven't finished reasoning about all of the fault-tolerance that needs to be done there to keep the connection relatively stable.