MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

Issue in MQTTnet version 4.3.6.1152

Open LeGi0N09 opened this issue 1 year ago • 1 comments

in my code working file by now when I use a local broker after changing to cloud base broker I can't save messages on the SQL Azure database to make some changes and I get an error any solution or recommend what database I can use for real-time data saving and broker recommended(cloud)

using Microsoft.Extensions.DependencyInjection;
using MQTTWebApp.Models;
using MQTTApp.Data;
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.EntityFrameworkCore;

namespace MQTTWebApp.Services
{
    public class MqttClientService
    {
        private readonly IMqttClient _mqttClient;
        private readonly MqttClientOptions _mqttClientOptions;
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly System.Timers.Timer _reconnectTimer;

        public MqttClientService(IServiceScopeFactory scopeFactory)
        {
            _scopeFactory = scopeFactory;
            var factory = new MqttFactory();
            _mqttClient = factory.CreateMqttClient();

            _mqttClientOptions = new MqttClientOptionsBuilder()
                .WithTcpServer("broker.hivemq.com", 1883) // Replace with your MQTT broker details
                .WithClientId(Guid.NewGuid().ToString())
                .Build();

            _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => OnConnected(e));
            _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => OnDisconnected(e));
            _mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e => OnMessageReceived(e));

            _reconnectTimer = new System.Timers.Timer(5000);  // Try reconnecting every 5 seconds
            _reconnectTimer.Elapsed += async (sender, e) => await AttemptReconnectAsync();
        }

        public async Task ConnectAsync()
        {
            try
            {
                await _mqttClient.ConnectAsync(_mqttClientOptions);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error while connecting to MQTT broker: {ex.Message}");
                _reconnectTimer.Start();
            }
        }

        private async Task OnConnected(MqttClientConnectedEventArgs e)
        {
            Console.WriteLine("Connected successfully with MQTT Broker.");
            _reconnectTimer.Stop(); // Stop the reconnect timer when connected

            // Subscribe to all topics using the wildcard "#"
            await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());
            Console.WriteLine("Subscribed to all topics.");
        }

        private Task OnDisconnected(MqttClientDisconnectedEventArgs e)
        {
            Console.WriteLine("Disconnected from MQTT Broker.");
            _reconnectTimer.Start(); // Start the reconnect timer when disconnected
            return Task.CompletedTask;
        }

        private async Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            try
            {
                Console.WriteLine("Message received");

                // Check the topic of the incoming message
                string topic = e.ApplicationMessage.Topic;
                string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);

                Console.WriteLine($"Received Message: {payload} on Topic: {topic}");

                using (var scope = _scopeFactory.CreateScope())
                {
                    var context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
                    Console.WriteLine("Database context created");

                    // Save all messages to the database
                    var message = new MqttPublishViewModel
                    {
                        Topic = topic,
                        Message = payload,
                        PublishDateTime = DateTime.Now // Use DateTime.Now for local machine time
                    };

                    context.MqttPublish.Add(message);
                    Console.WriteLine("Message added to context");

                    // Implement retry policy for database operation
                    bool saveFailed;
                    int attempt = 0;
                    do
                    {
                        saveFailed = false;
                        try
                        {
                            await context.SaveChangesAsync();
                            Console.WriteLine("Message saved to database");
                        }
                        catch (DbUpdateConcurrencyException ex)
                        {
                            saveFailed = true;
                            attempt++;
                            Console.WriteLine($"Concurrency exception: {ex.Message}. Retry attempt {attempt}");
                            await Task.Delay(1000); // Delay before retrying
                        }
                        catch (DbUpdateException ex)
                        {
                            saveFailed = true;
                            attempt++;
                            Console.WriteLine($"DbUpdate exception: {ex.Message}. Retry attempt {attempt}");
                            await Task.Delay(1000); // Delay before retrying
                        }
                    } while (saveFailed && attempt < 3); // Retry a few times if failed

                    if (saveFailed)
                    {
                        Console.WriteLine("Failed to save message after retries. Logging error...");
                        // Log the error or handle the failure appropriately
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error while processing message: {ex.Message}");
                Console.WriteLine($"Stack Trace: {ex.StackTrace}");
            }
        }

        public async Task PublishAsync(string topic, string payload)
        {
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
                .WithRetainFlag()
                .Build();

            if (_mqttClient.IsConnected)
            {
                await _mqttClient.PublishAsync(message);
            }
            else
            {
                Console.WriteLine("MQTT Client is not connected. Message not published.");
            }
        }

        public async Task SubscribeAsync(string topic)
        {
            if (_mqttClient.IsConnected)
            {
                await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());
            }
            else
            {
                Console.WriteLine("MQTT Client is not connected. Subscription failed.");
            }
        }

        public async Task UnsubscribeAsync(string topic)
        {
            if (_mqttClient.IsConnected)
            {
                await _mqttClient.UnsubscribeAsync(topic);
            }
            else
            {
                Console.WriteLine("MQTT Client is not connected. Unsubscription failed.");
            }
        }

        private async Task AttemptReconnectAsync()
        {
            if (!_mqttClient.IsConnected)
            {
                try
                {
                    await _mqttClient.ConnectAsync(_mqttClientOptions);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Reconnection attempt failed: {ex.Message}");
                }
            }
        }
    }
}

Error:

Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'ApplicationMessageReceivedHandler' and no accessible extension method 'ApplicationMessageReceivedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 34

Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'ConnectedHandler' and no accessible extension method 'ConnectedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 32 Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'DisconnectedHandler' and no accessible extension method 'DisconnectedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 33

Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttApplicationMessageReceivedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 34

Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttClientConnectedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 32

Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttClientDisconnectedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 33

LeGi0N09 avatar Jun 13 '24 06:06 LeGi0N09

From the events you use I can see that the version of the library is 3.x which is no longer supported. Please upgrade to the lastet version and check out the samples in the repo.

chkr1011 avatar Jun 15 '24 22:06 chkr1011