MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

Issue in MQTTnet version 4.3.6.1152

Open LeGi0N09 opened this issue 8 months ago • 1 comments

in my code working file by now when I use a local broker after changing to cloud base broker I can't save messages on the SQL Azure database to make some changes and I get an error any solution or recommend what database I can use for real-time data saving and broker recommended(cloud)

using Microsoft.Extensions.DependencyInjection;
using MQTTWebApp.Models;
using MQTTApp.Data;
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.EntityFrameworkCore;

namespace MQTTWebApp.Services
{
    public class MqttClientService
    {
        private readonly IMqttClient _mqttClient;
        private readonly MqttClientOptions _mqttClientOptions;
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly System.Timers.Timer _reconnectTimer;

        public MqttClientService(IServiceScopeFactory scopeFactory)
        {
            _scopeFactory = scopeFactory;
            var factory = new MqttFactory();
            _mqttClient = factory.CreateMqttClient();

            _mqttClientOptions = new MqttClientOptionsBuilder()
                .WithTcpServer("broker.hivemq.com", 1883) // Replace with your MQTT broker details
                .WithClientId(Guid.NewGuid().ToString())
                .Build();

            _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => OnConnected(e));
            _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => OnDisconnected(e));
            _mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e => OnMessageReceived(e));

            _reconnectTimer = new System.Timers.Timer(5000);  // Try reconnecting every 5 seconds
            _reconnectTimer.Elapsed += async (sender, e) => await AttemptReconnectAsync();
        }

        public async Task ConnectAsync()
        {
            try
            {
                await _mqttClient.ConnectAsync(_mqttClientOptions);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error while connecting to MQTT broker: {ex.Message}");
                _reconnectTimer.Start();
            }
        }

        private async Task OnConnected(MqttClientConnectedEventArgs e)
        {
            Console.WriteLine("Connected successfully with MQTT Broker.");
            _reconnectTimer.Stop(); // Stop the reconnect timer when connected

            // Subscribe to all topics using the wildcard "#"
            await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());
            Console.WriteLine("Subscribed to all topics.");
        }

        private Task OnDisconnected(MqttClientDisconnectedEventArgs e)
        {
            Console.WriteLine("Disconnected from MQTT Broker.");
            _reconnectTimer.Start(); // Start the reconnect timer when disconnected
            return Task.CompletedTask;
        }

        private async Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            try
            {
                Console.WriteLine("Message received");

                // Check the topic of the incoming message
                string topic = e.ApplicationMessage.Topic;
                string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);

                Console.WriteLine($"Received Message: {payload} on Topic: {topic}");

                using (var scope = _scopeFactory.CreateScope())
                {
                    var context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
                    Console.WriteLine("Database context created");

                    // Save all messages to the database
                    var message = new MqttPublishViewModel
                    {
                        Topic = topic,
                        Message = payload,
                        PublishDateTime = DateTime.Now // Use DateTime.Now for local machine time
                    };

                    context.MqttPublish.Add(message);
                    Console.WriteLine("Message added to context");

                    // Implement retry policy for database operation
                    bool saveFailed;
                    int attempt = 0;
                    do
                    {
                        saveFailed = false;
                        try
                        {
                            await context.SaveChangesAsync();
                            Console.WriteLine("Message saved to database");
                        }
                        catch (DbUpdateConcurrencyException ex)
                        {
                            saveFailed = true;
                            attempt++;
                            Console.WriteLine($"Concurrency exception: {ex.Message}. Retry attempt {attempt}");
                            await Task.Delay(1000); // Delay before retrying
                        }
                        catch (DbUpdateException ex)
                        {
                            saveFailed = true;
                            attempt++;
                            Console.WriteLine($"DbUpdate exception: {ex.Message}. Retry attempt {attempt}");
                            await Task.Delay(1000); // Delay before retrying
                        }
                    } while (saveFailed && attempt < 3); // Retry a few times if failed

                    if (saveFailed)
                    {
                        Console.WriteLine("Failed to save message after retries. Logging error...");
                        // Log the error or handle the failure appropriately
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error while processing message: {ex.Message}");
                Console.WriteLine($"Stack Trace: {ex.StackTrace}");
            }
        }

        public async Task PublishAsync(string topic, string payload)
        {
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
                .WithRetainFlag()
                .Build();

            if (_mqttClient.IsConnected)
            {
                await _mqttClient.PublishAsync(message);
            }
            else
            {
                Console.WriteLine("MQTT Client is not connected. Message not published.");
            }
        }

        public async Task SubscribeAsync(string topic)
        {
            if (_mqttClient.IsConnected)
            {
                await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());
            }
            else
            {
                Console.WriteLine("MQTT Client is not connected. Subscription failed.");
            }
        }

        public async Task UnsubscribeAsync(string topic)
        {
            if (_mqttClient.IsConnected)
            {
                await _mqttClient.UnsubscribeAsync(topic);
            }
            else
            {
                Console.WriteLine("MQTT Client is not connected. Unsubscription failed.");
            }
        }

        private async Task AttemptReconnectAsync()
        {
            if (!_mqttClient.IsConnected)
            {
                try
                {
                    await _mqttClient.ConnectAsync(_mqttClientOptions);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Reconnection attempt failed: {ex.Message}");
                }
            }
        }
    }
}

Error:

Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'ApplicationMessageReceivedHandler' and no accessible extension method 'ApplicationMessageReceivedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 34

Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'ConnectedHandler' and no accessible extension method 'ConnectedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 32 Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'DisconnectedHandler' and no accessible extension method 'DisconnectedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 33

Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttApplicationMessageReceivedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 34

Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttClientConnectedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 32

Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttClientDisconnectedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 33

LeGi0N09 avatar Jun 13 '24 06:06 LeGi0N09