MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

MQTTnet(2000 links) eating more and more memories until the system was crashed.

Open zmrbak opened this issue 4 months ago • 1 comments

using IotDevicesMock.Utils; using MQTTnet; using MQTTnet.Protocol; using System.Security.Authentication; using System.Text; using System.Text.Json; using System.Text.Json.Serialization;

namespace IotDevicesMock.Models { public class Device : IDisposable { [JsonIgnore] private static readonly ILogger logger = ServiceLocator.Provider.GetRequiredService<ILoggerFactory>().CreateLogger("Device"); [JsonIgnore] private static readonly IConfiguration configuration = ServiceLocator.Provider.GetRequiredService<IConfiguration>(); private static readonly Random _random = new Random(); private static readonly object _randomLock = new object();

    private bool _disposed = false;
    private Task _reportTask;
    private CancellationTokenSource _reportCts; // 独立控制报告任务的取消

    public string DeviceId { get; } = "T" + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString().Substring(6) + Guid.NewGuid().ToString("N").Substring(0, 4).ToUpper();
    public DeviceStatus Status { get; set; }
    public SwitchStatus SwitchStatus { get; set; }
    public DateTime StartTime { get; set; }
    public long MessagesAcknowledged;
    public long MessagesSent;
    public int ConnectionErrors;
    [JsonIgnore]
    public CancellationToken cancellationToken { get; set; } = CancellationToken.None;
    [JsonIgnore]
    public MqttConfig MqttConfig { get; }
    [JsonIgnore]
    public IMqttClient MqttClient { get; private set; }
    public int RetryCount { get; set; } = -1;
    public int Random { get; } = GetRandomNumber(1000, 10000);

    public Device()
    {
        MqttConfig = configuration.GetSection("MqttConfig").Get<MqttConfig>() ?? throw new InvalidOperationException("MQTT configuration is missing");
    }

    public async Task StartMqttAsync(CancellationToken cancellationToken)
    {
        this.cancellationToken = cancellationToken;
        StartTime = DateTime.UtcNow;

        // 每次启动前先释放旧资源
        if (_reportCts != null)
        {
            _reportCts.Cancel();
            _reportCts.Dispose();
        }
        _reportCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        // 用循环处理重连,避免资源堆积
        while (!cancellationToken.IsCancellationRequested)
        {
            RetryCount++;
            try
            {
                // 释放旧客户端(关键优化)
                await DisposeMqttClient();

                var clientFactory = new MqttClientFactory();
                var client = clientFactory.CreateMqttClient();

                var options = new MqttClientOptionsBuilder()
                    .WithClientId(DeviceId)
                    .WithTcpServer(MqttConfig.BrokerAddress, MqttConfig.BrokerPort)
                    .WithCredentials(MqttConfig.UserName, MqttConfig.Password)
                    .WithWillTopic(MqttConfig.WillTopic.Replace("{id}", DeviceId).Replace("{num}", Random.ToString()))
                    .WithWillPayload(Encoding.UTF8.GetBytes("客户端离线"))
                    .WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                    .WithTimeout(TimeSpan.FromSeconds(50))
                    .WithCleanSession(MqttConfig.CleanSession);

                if (MqttConfig.UseTls)
                {
                    options.WithTlsOptions(tls =>
                    {
                        tls.UseTls(true);
                        tls.WithSslProtocols(SslProtocols.Tls12);
                        tls.WithAllowUntrustedCertificates();
                        tls.WithCertificateValidationHandler(_ => true);
                    });
                }

                // 注册事件(合并重复订阅,避免内存泄漏)
                client.ApplicationMessageReceivedAsync += OnApplicationMessageReceived;
                client.ConnectedAsync += OnConnectedAsync;
                client.DisconnectedAsync += OnDisconnectedAsync;
                MqttClient = client;

                // 随机延迟启动(使用静态Random实例)
                int delayMs = GetRandomNumber(1, (int)(1000 * MqttConfig.KeepAlivePeriodSeconds));
                await Task.Delay(delayMs, cancellationToken);

                var connectResult = await client.ConnectAsync(options.Build(), cancellationToken);
                if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
                {
                    await SubscribeTopicAsync();
                    // 启动状态报告任务(使用独立CancellationToken)
                    _reportTask = ReportStatusAsync(_reportCts.Token);
                    break; // 连接成功则退出循环
                }
                else
                {
                    logger.LogError("Client {DeviceId} failed to connect", DeviceId);
                    Interlocked.Increment(ref ConnectionErrors);
                }
            }
            catch (OperationCanceledException)
            {
                logger.LogInformation("MQTT 连接操作已取消");
                throw;
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "MQTT 连接失败(重试次数: {RetryCount})", RetryCount);
                await Task.Delay(5000, cancellationToken);
            }
        }
    }

    private async Task SubscribeTopicAsync()
    {
        if (!MqttClient.IsConnected)
        {
            logger.LogWarning($"订阅失败:MQTT客户端未连接");
            return;
        }

        string subTopic = MqttConfig.SubTopic.Replace("{num}", Random.ToString()).Replace("{id}", DeviceId);
        await MqttClient.SubscribeAsync(new MqttTopicFilterBuilder()
                    .WithTopic(subTopic)
                    .WithQualityOfServiceLevel(MqttConfig.SubTopicQualityOfService)
                    .Build());
        logger.LogInformation($"已订阅MQTT主题: {subTopic}(QoS: {MqttConfig.SubTopicQualityOfService})");
    }

    private async Task ReportStatusAsync(CancellationToken token)
    {
        try
        {
            while (!token.IsCancellationRequested)
            {
                var statusReport = new
                {
                    MessageId = "auto",
                    Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString(),
                    Status = SwitchStatus.ToString(),
                };

                // 减少日志序列化开销(仅在调试时输出详细日志)
                if (logger.IsEnabled(LogLevel.Information))
                {
                    logger.LogInformation("Device {DeviceId} status: {Status}", DeviceId, statusReport.Status);
                }

                // 仅在连接状态下执行发布
                if (MqttClient?.IsConnected == true)
                {
                    var payload = JsonSerializer.SerializeToUtf8Bytes(statusReport);
                    var message = new MqttApplicationMessageBuilder()
                        .WithTopic(MqttConfig.PubTopic.Replace("{id}", DeviceId).Replace("{num}", Random.ToString()))
                        .WithPayload(payload)
                        .WithQualityOfServiceLevel(MqttConfig.PubTopicQualityOfService)
                        .Build();

                    await MqttClient.PublishAsync(message, token);
                    Interlocked.Increment(ref MessagesSent);
                }
                else
                {
                    logger.LogWarning("Device {DeviceId} is not connected, cannot send status", DeviceId);
                    // 连接断开时触发重连(通过退出循环让外层逻辑处理)
                    break;
                }

                // 等待下一次报告(使用配置的周期)
                await Task.Delay(TimeSpan.FromSeconds(MqttConfig.KeepAlivePeriodSeconds), token);
            }

            // 若因连接问题退出,尝试重连
            if (!token.IsCancellationRequested)
            {
                await StartMqttAsync(token);
            }
        }
        catch (OperationCanceledException)
        {
            logger.LogInformation("Device {DeviceId} status report canceled", DeviceId);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Device {DeviceId} status report error", DeviceId);
            // 出错后延迟重连,避免频繁重试
            if (!token.IsCancellationRequested)
            {
                await Task.Delay(3000, token);
                await StartMqttAsync(token);
            }
        }
    }

    private Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs args)
    {
        logger.LogWarning("Device {DeviceId} disconnected: {Reason}", DeviceId, args.ReasonString);
        return Task.CompletedTask;
    }

    private Task OnConnectedAsync(MqttClientConnectedEventArgs args)
    {
        logger.LogInformation("Device {DeviceId} connected to MQTT server", DeviceId);
        return Task.CompletedTask;
    }

    private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
    {
        Interlocked.Increment(ref MessagesAcknowledged);
        // 合并原有的两个消息处理逻辑,避免重复订阅
        string topic = e.ApplicationMessage.Topic;
        string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        logger.LogInformation("Device {DeviceId} received message: Topic={Topic}, Payload={Payload}", DeviceId, topic, payload);
        return Task.CompletedTask;
    }

    // 单独的MQTT客户端释放方法
    private async Task DisposeMqttClient()
    {
        if (MqttClient != null)
        {
            // 先取消事件订阅
            UnsubscribeEvents();

            // 断开连接(带超时保护)
            if (MqttClient.IsConnected)
            {
                try
                {
                    await MqttClient.DisconnectAsync(new MqttClientDisconnectOptions { ReasonString = "Normal disconnect" },
                        CancellationToken.None).WaitAsync(TimeSpan.FromSeconds(5));
                }
                catch (TimeoutException)
                {
                    logger.LogWarning("Device {DeviceId} disconnect timed out", DeviceId);
                }
            }

            // 释放客户端
            MqttClient.Dispose();
            MqttClient = null;
        }
    }

    private void UnsubscribeEvents()
    {
        if (MqttClient == null) return;

        // 移除所有事件订阅(必须与注册的方法一一对应)
        MqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceived;
        MqttClient.ConnectedAsync -= OnConnectedAsync;
        MqttClient.DisconnectedAsync -= OnDisconnectedAsync;
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (_disposed) return;

        if (disposing)
        {
            // 取消并释放报告任务
            if (_reportCts != null)
            {
                _reportCts.Cancel();
                _reportCts.Dispose();
                _reportCts = null;
            }

            // 等待报告任务结束(带超时)
            if (_reportTask != null && !_reportTask.IsCompleted)
            {
                try
                {
                    _reportTask.Wait(1000);
                }
                catch (AggregateException) { } // 忽略取消异常
                _reportTask = null;
            }

            // 释放MQTT客户端
            _ = DisposeMqttClient(); // 异步释放不阻塞Dispose
        }

        _disposed = true;
    }

    private static int GetRandomNumber(int min, int max)
    {
        lock (_randomLock)
        {
            return _random.Next(min, max);
        }
    }
}

}

zmrbak avatar Aug 10 '25 01:08 zmrbak

Is it possible that you track the memory consumption from dotmemory or Visual Studio? We need to identify which type of object is causing the memory leak.

In your looks I can see that you are allocating a lot of objects and data conversion. It may be the reason for the memory leak.

chkr1011 avatar Oct 20 '25 19:10 chkr1011