MQTTnet 4.3.3.952 - ClientId and Payload mismatch under high load
Hi, I am using MQTTnet (v4.5.9.2) to create a server (broker).I have multiple clients connecting with different ClientIds (e.g 0001, 0002, 0003 …).Each client sends a payload that contains its own ID.
Example: ClientId = 0001 Topic = topic/msg payload = {0001, hello} ClientId = 0002 Topic = topic/msg payload = {0002, hello}
Problem : When I increase the number of clients (e.g 10 clients continuously publishing), sometimes I get a mismatch: ClientId = 0002 Payload = {0003, hello}
This looks like an interrupt/overload issue, and the server receives the wrong combination of ClientId and payload.
Question : Why does this mismatch happen? How can I ensure that the payload always matches the ClientId?
Code: public static async void ServerStart() { var options = new MqttServerOptionsBuilder() .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ipstring)) .WithDefaultEndpoint() .WithDefaultEndpointPort(80) .WithConnectionBacklog(100);
var server = new MqttFactory().CreateMqttServer(options.Build());
server.ValidatingConnectionAsync += ValidatingConnection;
server.ClientConnectedAsync += Server_ClientConnectedAsync;
server.ClientDisconnectedAsync += Server_ClientDisconnectAsync;
server.InterceptingPublishAsync += Server_InterceptingPublishAsync;
await server.StartAsync(); // or server.StartAsync().GetAwaiter().GetResult();
}
static Task Server_ClientConnectedAsync(ClientConnectedEventArgs clientConnectedEventArgs)
{
Console.WriteLine(clientConnectedEventArgs.ClientId+ " Connected ");
return Task.CompletedTask;
}
static Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
Client_Message = "ClientId : " + arg.ClientId + " Topic : " + arg.ApplicationMessage?.Topic + " Payload : " + arg.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(arg.ApplicationMessage?.Payload) + time;
Console.WriteLine(Client_Message + "\n");
}
Could you share the client side code that produces the issue as a minimal example?
I had this once as well, but in my case I used parallel processing with the same (readonly) instance of the client factory...
I have tried the following test with the latest 4.x release and a .NET 8 console app calling InterceptTest.InterceptServerStart, but it never reported a mismatch. Any suggestions what else could be tried or how the test could be modified to produce the problem?
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System.Net;
using System.Text;
namespace InterceptTest
{
internal class InterceptTest
{
static int s_okCount = 0;
static int s_NumClientsConnected = 0;
public static void InterceptServerStart()
{
var options = new MqttServerOptionsBuilder()
.WithDefaultEndpointBoundIPAddress(IPAddress.Parse("127.0.0.1"))
.WithDefaultEndpoint()
.WithDefaultEndpointPort(1883)
.WithConnectionBacklog(100);
var server = new MqttFactory().CreateMqttServer(options.Build());
server.ValidatingConnectionAsync += Server_ValidatingConnectionAsync;
server.ClientConnectedAsync += Server_ClientConnectedAsync;
server.ClientDisconnectedAsync += Server_ClientDisconnectedAsync;
server.InterceptingPublishAsync += Server_InterceptingPublishAsync;
_ = Task.Run(async () =>
{
await server.StartAsync();
});
_ = RunClients();
while (true)
{
Console.WriteLine();
Console.WriteLine("Press enter to see counts");
Console.ReadLine();
Console.WriteLine("Clients connected: " + s_NumClientsConnected + ", OK count: " + s_okCount);
}
}
private static Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
s_NumClientsConnected--;
return CompletedTask.Instance;
}
private static Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
return CompletedTask.Instance;
}
static Task Server_ClientConnectedAsync(ClientConnectedEventArgs clientConnectedEventArgs)
{
Console.WriteLine(clientConnectedEventArgs.ClientId + " Connected ");
s_NumClientsConnected++;
return CompletedTask.Instance;
}
static Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
var clientId = arg.ClientId;
var topic = arg.ApplicationMessage?.Topic;
var payload = arg.ApplicationMessage?.Payload == null ? "(null)" : Encoding.UTF8.GetString(arg.ApplicationMessage?.Payload);
if (clientId != topic || topic != payload)
{
var client_Message = "ClientId : " + arg.ClientId + " Topic : " + topic + " Payload : " + payload;
Console.WriteLine(client_Message);
// Stop
Console.ReadLine();
}
else
{
s_okCount++;
}
return CompletedTask.Instance;
}
static async Task RunClients()
{
var tasks = new List<Task>();
for (var i = 0; i < 10; i++)
{
tasks.Add(RunClient(i));
}
await Task.WhenAll(tasks.ToArray());
}
static async Task RunClient(int clientNo)
{
var logger = new MqttNetEventLogger();
var factory = new MqttFactory(logger);
var client = factory.CreateMqttClient();
var clientOptions = new MqttClientOptions
{
ClientId = "client/" + clientNo,
ChannelOptions = new MqttClientTcpOptions
{
RemoteEndpoint = new DnsEndPoint("127.0.0.1", 1883)
}
};
await client.ConnectAsync(clientOptions);
while (true)
{
var topic = "client/" + clientNo;
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(topic)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build();
await client.PublishAsync(applicationMessage);
}
}
}
}
Output, was, for example:
Press enter to see counts
client/1 Connected
client/0 Connected
client/2 Connected
client/8 Connected
client/4 Connected
client/7 Connected
client/3 Connected
client/6 Connected
client/9 Connected
client/5 Connected
Clients connected: 10, OK count: 7057607
Press enter to see counts
Clients connected: 10, OK count: 50935762
Press enter to see counts
Clients connected: 10, OK count: 73605352
Press enter to see counts
To me it looks like that there is async/await usage issue.
The code posted above has no await nor a return Task.CompletedTask,
static Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
Client_Message = "ClientId : " + arg.ClientId + " Topic : " + arg.ApplicationMessage?.Topic + " Payload : " + arg.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(arg.ApplicationMessage?.Payload) + time;
Console.WriteLine(Client_Message + "\n");
}
In this case the code will run in parallel for multiple messages and this may lead to accessing buffers from other messages etc.
@logicaloud Please try your test again with a removed return CompletedTask.Instance;.
@chkr1011, the code does not compile without return CompletedTask.Instance;, so presumably it was a mistake in the code sample. The client side construction of the messages would be the next thing to have a look at but without a the original client code and a working example that produces the issue, it remains a guessing game. I'd be happy to look at the client side code if @Kirubhashini07 can provide it.
It seems like the concern is that the message payload on the server is expected to correspond to a string "ClientId = <clientId> Topic = topic/msg payload = {<clientId>, hello}:"
However the behavior being experienced differs from the expectation, because <clientID> is not consistent. between the ClientId = and the payload = ?
I agree that more information about the client would be helpful.
If there was disconnect between the clientId and the payload on the server, it would be caused by the server having an asynchronous / divergent processing in the server code. I would normally expect a single packet to contain both the client id and payload in this scenario and not involve any divergent behaviors.... Is there any places where a single packet or payload is being processed asynchronously?