MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

UWP PublishAsync forever hangs

Open don-pironet-hatch opened this issue 9 months ago • 0 comments

Describe the bug

When sending a message with a payload of 10mb or more on some devices the publishAsync never returns. The MQTT broke will display that the client is going in timeout after 1 minute.

Which component is your bug related to?

  • Client

To Reproduce

Steps to reproduce the behavior:

  1. Using this version of MQTTnet 'v4.3.5.1141.'.
  2. Run code example below
  3. Install MQTT broker on a different computer start with following command and config: config.conf
listener 1883
allow_anonymous true

mosquitto -v -c config.conf 4. On most devices it works fine. On for example an panasonic toughbook it fails with files for sizes of 1MB. The publishAsyncMethod hangs forever. Even after the timeout of the MQTT broker.

Expected behavior

Expected behavior would be that the publishAsyncMethod returns OR throws an exception.

Code example

Started via Visual Studio a new UWP application with only this file.

using System;
using System.ComponentModel;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Windows.Storage;
using Windows.Storage.Pickers;
using Windows.UI.Core;
using Windows.UI.Xaml;
using Windows.UI.Xaml.Controls;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using Serilog;
using Serilog.Events;

// The Blank Page item template is documented at https://go.microsoft.com/fwlink/?LinkId=402352&clcid=0x409

namespace App2
{
    class FilePicker
    {
        public static async Task<StorageFile> SelectFile()
        {
            var picker = new FileOpenPicker();
            picker.ViewMode = PickerViewMode.Thumbnail;
            picker.SuggestedStartLocation = PickerLocationId.Downloads;
            picker.FileTypeFilter.Add(".tar");

            var file = await picker.PickSingleFileAsync();
            return file;
        }
    }

    public class ObservableProperty<T> : INotifyPropertyChanged
    {
        public ObservableProperty(T value)
        {
            Value = value;
        }

        public T Value
        {
            get => _value;
            set
            {
                _value = value;
                PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Value)));
            }
        }

        private T _value { get; set; }

        public event PropertyChangedEventHandler PropertyChanged;
    }

    /// <summary>
    /// An empty page that can be used on its own or navigated to within a Frame.
    /// </summary>
    public sealed partial class MainPage : Page
    {
        private string ipAddress = "192.168.1.222";
        private readonly ObservableProperty<string> status = new ObservableProperty<string>("0");

        public MainPage()
        {
            this.InitializeComponent();
            Log.Logger = new LoggerConfiguration()
                .WriteTo.Console()
                .WriteTo.Debug()
                .WriteTo.File(
                    Path.Combine(ApplicationData.Current.LocalFolder.Path, "log.txt"),
                    LogEventLevel.Information,
                    rollingInterval: RollingInterval.Day,
                    rollOnFileSizeLimit: true,
                    shared: true
                )
                .CreateLogger();
        }

        private void TextBox_OnTextChanged(object sender, TextChangedEventArgs e)
        {
            if (sender is TextBox textBox) ipAddress = textBox.Text;
        }

        private async void RunOnDispatcher(DispatchedHandler callback)
        {
            await Dispatcher.RunAsync(
                CoreDispatcherPriority.Normal,
                callback
            );
        }

        private async void Select_File(object sender, RoutedEventArgs e)
        {
            var file = await FilePicker.SelectFile();
            if (file == null || string.IsNullOrEmpty(ipAddress)) return;

            RunOnDispatcher(
                async () =>
                {
                    status.Value = "Connecting";

                    var mqttEventLogger = new MqttNetEventLogger("MyCustomLogger");

                    mqttEventLogger.LogMessagePublished += (_, argss) =>
                    {
                        var output = new StringBuilder();
                        output.AppendLine(
                            $">> [{argss.LogMessage.Timestamp:O}] [{argss.LogMessage.ThreadId}] [{argss.LogMessage.Source}] [{argss.LogMessage.Level}]: {argss.LogMessage.Message}");
                        if (argss.LogMessage.Exception != null)
                        {
                            output.AppendLine(argss.LogMessage.Exception.ToString());
                        }

                        Serilog.Log.Information(output.ToString());
                    };

                    // Create a MQTT client factory
                    var factory = new MqttFactory(mqttEventLogger);

                    // Create a MQTT client instance
                    var mqttClient = factory.CreateMqttClient();

                    // Create MQTT client options
                    var options = new MqttClientOptionsBuilder().WithTcpServer(ipAddress)
                        .WithKeepAlivePeriod(TimeSpan.FromSeconds(30)).WithCleanSession().Build();

                    // Connect to MQTT broker
                    var connectResult = await mqttClient.ConnectAsync(options);

                    if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
                    {
                        status.Value = "Connected";
                        Serilog.Log.Information("Connected to MQTT broker successfully.");

                        // Subscribe to a topic
                        //await mqttClient.SubscribeAsync(topic);

                        // Callback function when a message is received
                        mqttClient.ApplicationMessageReceivedAsync += result =>
                        {
                            //Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
                            Serilog.Log.Information($"Received message");
                            return Task.CompletedTask;
                        };

                        try
                        {
                            var stream = await file.OpenStreamForReadAsync();
                            var memoryStream = new MemoryStream();
                            await stream.CopyToAsync(memoryStream);
                            var bytes = memoryStream.ToArray();

                            Serilog.Log.Information($"Bytes length {bytes.Length}");

                            string handle = "1716301517";

                            // Publish upload 
                            string uploadFileRequest = $"files/request/upload/{handle}";
                            var uploadMessage = new MqttApplicationMessageBuilder()
                                .WithTopic(uploadFileRequest)
                                .WithPayload(bytes)
                                .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                                .WithRetainFlag()
                                .Build();

                            var uploadResult = await mqttClient.PublishAsync(uploadMessage); 
                            Serilog.Log.Information($"Upload result {uploadResult.IsSuccess}"); // THIS line is never reached

                            status.Value = "Done";
                            stream.Close();
                            memoryStream.Close();
                        }
                        catch (Exception exception)
                        {
                            Serilog.Log.Information(exception.ToString());
                        }
                        finally
                        {
                            // Unsubscribe and disconnect
                            //await mqttClient.UnsubscribeAsync(topic);
                            await mqttClient.DisconnectAsync();
                            Serilog.Log.Information("Done");
                            status.Value = "Closed";
                        }
                    }
                    else
                    {
                        Serilog.Log.Information($"Failed to connect to MQTT broker: {connectResult.ResultCode}");
                        status.Value = "Error connecting";
                    }
                }
            );
        }
    }
}

don-pironet-hatch avatar May 22 '24 18:05 don-pironet-hatch