MQTTnet
MQTTnet copied to clipboard
UWP PublishAsync forever hangs
Describe the bug
When sending a message with a payload of 10mb or more on some devices the publishAsync never returns. The MQTT broke will display that the client is going in timeout after 1 minute.
Which component is your bug related to?
- Client
To Reproduce
Steps to reproduce the behavior:
- Using this version of MQTTnet 'v4.3.5.1141.'.
- Run code example below
- Install MQTT broker on a different computer start with following command and config: config.conf
listener 1883
allow_anonymous true
mosquitto -v -c config.conf
4. On most devices it works fine. On for example an panasonic toughbook it fails with files for sizes of 1MB. The publishAsyncMethod hangs forever. Even after the timeout of the MQTT broker.
Expected behavior
Expected behavior would be that the publishAsyncMethod returns OR throws an exception.
Code example
Started via Visual Studio a new UWP application with only this file.
using System;
using System.ComponentModel;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Windows.Storage;
using Windows.Storage.Pickers;
using Windows.UI.Core;
using Windows.UI.Xaml;
using Windows.UI.Xaml.Controls;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using Serilog;
using Serilog.Events;
// The Blank Page item template is documented at https://go.microsoft.com/fwlink/?LinkId=402352&clcid=0x409
namespace App2
{
class FilePicker
{
public static async Task<StorageFile> SelectFile()
{
var picker = new FileOpenPicker();
picker.ViewMode = PickerViewMode.Thumbnail;
picker.SuggestedStartLocation = PickerLocationId.Downloads;
picker.FileTypeFilter.Add(".tar");
var file = await picker.PickSingleFileAsync();
return file;
}
}
public class ObservableProperty<T> : INotifyPropertyChanged
{
public ObservableProperty(T value)
{
Value = value;
}
public T Value
{
get => _value;
set
{
_value = value;
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value)));
}
}
private T _value { get; set; }
public event PropertyChangedEventHandler PropertyChanged;
}
/// <summary>
/// An empty page that can be used on its own or navigated to within a Frame.
/// </summary>
public sealed partial class MainPage : Page
{
private string ipAddress = "192.168.1.222";
private readonly ObservableProperty<string> status = new ObservableProperty<string>("0");
public MainPage()
{
this.InitializeComponent();
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.WriteTo.Debug()
.WriteTo.File(
Path.Combine(ApplicationData.Current.LocalFolder.Path, "log.txt"),
LogEventLevel.Information,
rollingInterval: RollingInterval.Day,
rollOnFileSizeLimit: true,
shared: true
)
.CreateLogger();
}
private void TextBox_OnTextChanged(object sender, TextChangedEventArgs e)
{
if (sender is TextBox textBox) ipAddress = textBox.Text;
}
private async void RunOnDispatcher(DispatchedHandler callback)
{
await Dispatcher.RunAsync(
CoreDispatcherPriority.Normal,
callback
);
}
private async void Select_File(object sender, RoutedEventArgs e)
{
var file = await FilePicker.SelectFile();
if (file == null || string.IsNullOrEmpty(ipAddress)) return;
RunOnDispatcher(
async () =>
{
status.Value = "Connecting";
var mqttEventLogger = new MqttNetEventLogger("MyCustomLogger");
mqttEventLogger.LogMessagePublished += (_, argss) =>
{
var output = new StringBuilder();
output.AppendLine(
$">> [{argss.LogMessage.Timestamp:O}] [{argss.LogMessage.ThreadId}] [{argss.LogMessage.Source}] [{argss.LogMessage.Level}]: {argss.LogMessage.Message}");
if (argss.LogMessage.Exception != null)
{
output.AppendLine(argss.LogMessage.Exception.ToString());
}
Serilog.Log.Information(output.ToString());
};
// Create a MQTT client factory
var factory = new MqttFactory(mqttEventLogger);
// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();
// Create MQTT client options
var options = new MqttClientOptionsBuilder().WithTcpServer(ipAddress)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30)).WithCleanSession().Build();
// Connect to MQTT broker
var connectResult = await mqttClient.ConnectAsync(options);
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
status.Value = "Connected";
Serilog.Log.Information("Connected to MQTT broker successfully.");
// Subscribe to a topic
//await mqttClient.SubscribeAsync(topic);
// Callback function when a message is received
mqttClient.ApplicationMessageReceivedAsync += result =>
{
//Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
Serilog.Log.Information($"Received message");
return Task.CompletedTask;
};
try
{
var stream = await file.OpenStreamForReadAsync();
var memoryStream = new MemoryStream();
await stream.CopyToAsync(memoryStream);
var bytes = memoryStream.ToArray();
Serilog.Log.Information($"Bytes length {bytes.Length}");
string handle = "1716301517";
// Publish upload
string uploadFileRequest = $"files/request/upload/{handle}";
var uploadMessage = new MqttApplicationMessageBuilder()
.WithTopic(uploadFileRequest)
.WithPayload(bytes)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag()
.Build();
var uploadResult = await mqttClient.PublishAsync(uploadMessage);
Serilog.Log.Information($"Upload result {uploadResult.IsSuccess}"); // THIS line is never reached
status.Value = "Done";
stream.Close();
memoryStream.Close();
}
catch (Exception exception)
{
Serilog.Log.Information(exception.ToString());
}
finally
{
// Unsubscribe and disconnect
//await mqttClient.UnsubscribeAsync(topic);
await mqttClient.DisconnectAsync();
Serilog.Log.Information("Done");
status.Value = "Closed";
}
}
else
{
Serilog.Log.Information($"Failed to connect to MQTT broker: {connectResult.ResultCode}");
status.Value = "Error connecting";
}
}
);
}
}
}