MQTTnet
MQTTnet copied to clipboard
Issue in MQTTnet version 4.3.6.1152
in my code working file by now when I use a local broker after changing to cloud base broker I can't save messages on the SQL Azure database to make some changes and I get an error any solution or recommend what database I can use for real-time data saving and broker recommended(cloud)
using Microsoft.Extensions.DependencyInjection;
using MQTTWebApp.Models;
using MQTTApp.Data;
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.EntityFrameworkCore;
namespace MQTTWebApp.Services
{
public class MqttClientService
{
private readonly IMqttClient _mqttClient;
private readonly MqttClientOptions _mqttClientOptions;
private readonly IServiceScopeFactory _scopeFactory;
private readonly System.Timers.Timer _reconnectTimer;
public MqttClientService(IServiceScopeFactory scopeFactory)
{
_scopeFactory = scopeFactory;
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
_mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("broker.hivemq.com", 1883) // Replace with your MQTT broker details
.WithClientId(Guid.NewGuid().ToString())
.Build();
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => OnConnected(e));
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(e => OnDisconnected(e));
_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e => OnMessageReceived(e));
_reconnectTimer = new System.Timers.Timer(5000); // Try reconnecting every 5 seconds
_reconnectTimer.Elapsed += async (sender, e) => await AttemptReconnectAsync();
}
public async Task ConnectAsync()
{
try
{
await _mqttClient.ConnectAsync(_mqttClientOptions);
}
catch (Exception ex)
{
Console.WriteLine($"Error while connecting to MQTT broker: {ex.Message}");
_reconnectTimer.Start();
}
}
private async Task OnConnected(MqttClientConnectedEventArgs e)
{
Console.WriteLine("Connected successfully with MQTT Broker.");
_reconnectTimer.Stop(); // Stop the reconnect timer when connected
// Subscribe to all topics using the wildcard "#"
await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());
Console.WriteLine("Subscribed to all topics.");
}
private Task OnDisconnected(MqttClientDisconnectedEventArgs e)
{
Console.WriteLine("Disconnected from MQTT Broker.");
_reconnectTimer.Start(); // Start the reconnect timer when disconnected
return Task.CompletedTask;
}
private async Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
try
{
Console.WriteLine("Message received");
// Check the topic of the incoming message
string topic = e.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
Console.WriteLine($"Received Message: {payload} on Topic: {topic}");
using (var scope = _scopeFactory.CreateScope())
{
var context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
Console.WriteLine("Database context created");
// Save all messages to the database
var message = new MqttPublishViewModel
{
Topic = topic,
Message = payload,
PublishDateTime = DateTime.Now // Use DateTime.Now for local machine time
};
context.MqttPublish.Add(message);
Console.WriteLine("Message added to context");
// Implement retry policy for database operation
bool saveFailed;
int attempt = 0;
do
{
saveFailed = false;
try
{
await context.SaveChangesAsync();
Console.WriteLine("Message saved to database");
}
catch (DbUpdateConcurrencyException ex)
{
saveFailed = true;
attempt++;
Console.WriteLine($"Concurrency exception: {ex.Message}. Retry attempt {attempt}");
await Task.Delay(1000); // Delay before retrying
}
catch (DbUpdateException ex)
{
saveFailed = true;
attempt++;
Console.WriteLine($"DbUpdate exception: {ex.Message}. Retry attempt {attempt}");
await Task.Delay(1000); // Delay before retrying
}
} while (saveFailed && attempt < 3); // Retry a few times if failed
if (saveFailed)
{
Console.WriteLine("Failed to save message after retries. Logging error...");
// Log the error or handle the failure appropriately
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error while processing message: {ex.Message}");
Console.WriteLine($"Stack Trace: {ex.StackTrace}");
}
}
public async Task PublishAsync(string topic, string payload)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithRetainFlag()
.Build();
if (_mqttClient.IsConnected)
{
await _mqttClient.PublishAsync(message);
}
else
{
Console.WriteLine("MQTT Client is not connected. Message not published.");
}
}
public async Task SubscribeAsync(string topic)
{
if (_mqttClient.IsConnected)
{
await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());
}
else
{
Console.WriteLine("MQTT Client is not connected. Subscription failed.");
}
}
public async Task UnsubscribeAsync(string topic)
{
if (_mqttClient.IsConnected)
{
await _mqttClient.UnsubscribeAsync(topic);
}
else
{
Console.WriteLine("MQTT Client is not connected. Unsubscription failed.");
}
}
private async Task AttemptReconnectAsync()
{
if (!_mqttClient.IsConnected)
{
try
{
await _mqttClient.ConnectAsync(_mqttClientOptions);
}
catch (Exception ex)
{
Console.WriteLine($"Reconnection attempt failed: {ex.Message}");
}
}
}
}
}
Error:
Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'ApplicationMessageReceivedHandler' and no accessible extension method 'ApplicationMessageReceivedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 34
Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'ConnectedHandler' and no accessible extension method 'ConnectedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 32 Severity Code Description Project File Line Suppression State Error (active) CS1061 'IMqttClient' does not contain a definition for 'DisconnectedHandler' and no accessible extension method 'DisconnectedHandler' accepting a first argument of type 'IMqttClient' could be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 33
Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttApplicationMessageReceivedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 34
Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttClientConnectedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 32
Severity Code Description Project File Line Suppression State Error (active) CS0246 The type or namespace name 'MqttClientDisconnectedHandlerDelegate' could not be found (are you missing a using directive or an assembly reference?) MQTTApp E:\R&D\MQTTWebApp\MQTTApp\Services\MqttClientService.cs 33