MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

Externally triggered Request/Response seems to have a blocking issue

Open toineoverbeek opened this issue 4 years ago • 0 comments

I'm trying to use MqttNet for machine control, and for this reason make use of the Mqtt5 request/response functionality. After initial testing, this worked great, but now I'm running into a wall, because I want to trigger a request from a remote source using a publish. So simplistically i'm tring to do the following:

Manager => publish(trigger) => Device
Device => Request(manager, data) => Manager
    Device waits for response to complete (with timeout)
Manager => Response(ok) => Device

But in my case the response is not being handled by the UseApplicationMessageReceivedHandler, while a mqtt explorer does show the response being published. The device throws its timeout exception.

I'm using the ManagedMqttClient, with a Mosquitto service running on localhost. I made the following simplified Unit test to illustrate what I think goes wrong. When it is run, the Task.Delay will throw a TaskCancelledException at the end.

        [Test]
        public async Task CallbackTest()
        {
            bool requestHandled = false;
            const string triggerTopic = "trigger";
            const string requestTopic = "request";
            const string responseTopic = "response";

            var client = await CreateClientAsync(Guid.NewGuid().ToString());
            client.UseApplicationMessageReceivedHandler(async arg =>
            {
                switch (arg.ApplicationMessage.Topic)
                {                    
                    case triggerTopic:// the sequence is triggered
                        await client.PublishAsync(requestTopic);
                        break;                    
                    case requestTopic:// the request is being handled
                        await client.PublishAsync(responseTopic);
                        while (!requestHandled)
                            await Task.Delay(10);
                        break;                    
                    case responseTopic:// The response is received
                        await Task.Delay(100);
                        requestHandled = true;
                        break;
                    default:
                        throw new NotImplementedException();
                }
            });

            await client.SubscribeAsync(triggerTopic);
            await client.SubscribeAsync(requestTopic);
            await client.SubscribeAsync(responseTopic);

            // Start the sequence
            await client.PublishAsync(triggerTopic);

            using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
            while (!requestHandled)
                await Task.Delay(10, cts.Token);
        }

The CreateClient function is set up like so:

        private async Task<IManagedMqttClient> CreateClientAsync(string id)
        {
            const string address = "localhost";
            var factory = new MqttFactory();

            var client = factory.CreateManagedMqttClient();
            var options = new ManagedMqttClientOptionsBuilder()
                .WithClientOptions(new MqttClientOptionsBuilder()
                .WithClientId(id)   
                .WithTcpServer(address)
                .Build())
                .Build();

            await client.StartAsync(options);

            while (!client.IsConnected)
                await Task.Delay(1);
            return client;
        }

toineoverbeek avatar Jul 20 '21 13:07 toineoverbeek

Maybe a small sleep after subscribing may help because the managed client will not subscribe directly. It only adds the subscription to an internal storage and subscribes later on.

I am wondering if the code is hung due to while (!requestHandled). The thing is that this will block the worker thread and there is no chance that the handler will be called again. Please move this handling to a dedicated Task (Task.Run) etc.

chkr1011 avatar Sep 14 '22 07:09 chkr1011