MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

There are delays in sending and receiving messages

Open bobbyliu58 opened this issue 3 years ago • 3 comments

There are delays in sending and receiving messages

At present, there are less than 4000 clients in the project. There will be a delay of about 20s when receiving 4000 / min and sending 2000 / min. I use mqtt.fx to do the subscription test to verify that mqttnet.server has a delay. Is it the delay caused by the sorting of queue processing messages that you mentioned earlier?

Is it because I upgraded the underlying layer from v3.0.14 to v3.0.15?

I'm still looking forward to your queue multithreading asynchronous processing, because we don't have to consider the sequence of messages

Do you have any good suggestions for the problems I have now.

The following are two times of sending and receiving. Normally, 1-2s is OK, but in the figure, it is about 20s

image

The following figure shows the overall delivery and delivery before and after delay

image

Here is my code for sending and receiving messages

`public override void SendMsg(string client, byte[] buffer, string topic = null, MqttQualityOfServiceLevel level = MqttQualityOfServiceLevel.AtMostOnce) { Task.Factory.StartNew(async () => { try { if (topic == null) { topic = $"{ _serviceParms.DefaultServiceTopic.TrimEnd('+').TrimEnd('/')}/{client}/"; } if (level == 0) { level = _serviceParms.ServiceLevel; } var message = new MqttApplicationMessageBuilder() .WithQualityOfServiceLevel(level) .WithTopic(topic) .WithPayload(buffer).Build(); await mqttServer.PublishAsync(message); Logger.Info("MqttServer", $"发送客户端:{client}", buffer.ToStringByHex()); //发送统计 var sendTotalKey = $"MqttServer_Send{DateTime.Now.ToString("yyyyMMddHHmm")}"; MsgTotal total = null;

                if (_cache.TryGetValue(sendTotalKey, out total))
                {
                    total.total++;
                    _cache.Set(sendTotalKey, total);
                }
                else
                {
                    total = new MsgTotal() { total = 1, time = DateTime.Now };
                    _cache.Set(sendTotalKey, total, new DateTimeOffset(DateTime.Now.AddMinutes(3)));
                    var oldTotal = new MsgTotal();
                    var oldSendTotalKey = $"MqttServer_Send_{DateTime.Now.AddMinutes(-1).ToString("yyyyMMddHHmm")}";

                    if (_cache.TryGetValue(oldSendTotalKey, out oldTotal))
                    {
                        Logger.Info("MqttServer", $"发送统计:{oldTotal.total}", oldTotal);
                        //Console.WriteLine($"发送统计:{oldTotal.total}-{oldTotal.time.ToString("yyyyMMddHHmm")}");
                    }
                }
            }
            catch (Exception ex)
            {
                Logger.Exception(ex, "MqttServer", $"发送");
            }

        });
    }

private void MqttServe_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { try { if (string.IsNullOrEmpty(e.ClientId)) { return; } Logger.Info("MqttServer", $"接收客户端:{e.ClientId}", e.ApplicationMessage.Payload.ToStringByHex()); //缓存Client topic if (!string.IsNullOrEmpty(e.ClientId)) { if (!ClientList.ContainsKey(e.ClientId)) { ClientList.TryAdd(e.ClientId, new List { e.ApplicationMessage.Topic }); } else { var topicList = ClientList[e.ClientId] as List; if (topicList == null) { ClientList[e.ClientId] = new List { e.ApplicationMessage.Topic }; } else if (!topicList.Exists(i => i == e.ApplicationMessage.Topic)) { topicList.Add(e.ApplicationMessage.Topic); ClientList[e.ClientId] = topicList; } } }

            IoTService.ServiceInstance.ClientMsgQueue.Enqueue(new MsgObject { clientId = e.ClientId, msg = e.ApplicationMessage.Payload });

       
            Task.Factory.StartNew(() =>
            {
                //接收统计
                //接收统计
                var sendTotalKey = $"MqttServer_RecvMsg_{DateTime.Now.ToString("yyyyMMddHHmm")}";
                MsgTotal total = null;// new MsgTotal() { total = 1, time = DateTime.Now };

                if (_cache.TryGetValue(sendTotalKey, out total))
                {
                    total.total++;
                    _cache.Set(sendTotalKey, total);
                }
                else
                {
                    total = new MsgTotal() { total = 1, time = DateTime.Now };
                    _cache.Set(sendTotalKey, total, new DateTimeOffset(DateTime.Now.AddMinutes(3)));
                    var oldTotal = new MsgTotal();
                    var oldSendTotalKey = $"MqttServer_RecvMsg_{DateTime.Now.AddMinutes(-1).ToString("yyyyMMddHHmm")}";
                    if (_cache.TryGetValue(oldSendTotalKey, out oldTotal))
                    {
                        // Console.WriteLine($"接收统计:{oldTotal.total}-{oldTotal.time.ToString("yyyyMMddHHmm")}");
                        Logger.Info("MqttServer", $"接收统计:{oldTotal.total}", oldTotal);
                    }
                }
            });
        }
        catch (Exception ex)
        {
            Logger.Exception(ex, "MqttServer_ApplicationMessageReceived", $"客户端:{e.ClientId} 主题:{e.ApplicationMessage.Topic}");
        }
    }`

Previous feedback https://github.com/chkr1011/MQTTnet/issues/1046

bobbyliu58 avatar Apr 28 '21 11:04 bobbyliu58

Tonight again appeared this situation many times, suddenly occasionally slow, then may later soon, now feel as if I had not found.

bobbyliu58 avatar Apr 28 '21 14:04 bobbyliu58

All the server configuration upgrade, the current server performance is enough, but still there will be a delay, found that the CPU can regular fluctuations. The current server configuration: 8 u 16 g, 5 m bandwidth The image below for a day of CPU volatility,Basically every 15-20 minutes there will be a peak, then there will be a delay。 image

bobbyliu58 avatar May 03 '21 12:05 bobbyliu58

I have also encountered the same problem. Is there a better solution?

XuLinFei avatar Sep 24 '23 13:09 XuLinFei