MQTTnet
MQTTnet copied to clipboard
Externally triggered Request/Response seems to have a blocking issue
I'm trying to use MqttNet for machine control, and for this reason make use of the Mqtt5 request/response functionality. After initial testing, this worked great, but now I'm running into a wall, because I want to trigger a request from a remote source using a publish. So simplistically i'm tring to do the following:
Manager => publish(trigger) => Device
Device => Request(manager, data) => Manager
Device waits for response to complete (with timeout)
Manager => Response(ok) => Device
But in my case the response is not being handled by the UseApplicationMessageReceivedHandler, while a mqtt explorer does show the response being published. The device throws its timeout exception.
I'm using the ManagedMqttClient, with a Mosquitto service running on localhost. I made the following simplified Unit test to illustrate what I think goes wrong. When it is run, the Task.Delay will throw a TaskCancelledException at the end.
[Test]
public async Task CallbackTest()
{
bool requestHandled = false;
const string triggerTopic = "trigger";
const string requestTopic = "request";
const string responseTopic = "response";
var client = await CreateClientAsync(Guid.NewGuid().ToString());
client.UseApplicationMessageReceivedHandler(async arg =>
{
switch (arg.ApplicationMessage.Topic)
{
case triggerTopic:// the sequence is triggered
await client.PublishAsync(requestTopic);
break;
case requestTopic:// the request is being handled
await client.PublishAsync(responseTopic);
while (!requestHandled)
await Task.Delay(10);
break;
case responseTopic:// The response is received
await Task.Delay(100);
requestHandled = true;
break;
default:
throw new NotImplementedException();
}
});
await client.SubscribeAsync(triggerTopic);
await client.SubscribeAsync(requestTopic);
await client.SubscribeAsync(responseTopic);
// Start the sequence
await client.PublishAsync(triggerTopic);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
while (!requestHandled)
await Task.Delay(10, cts.Token);
}
The CreateClient function is set up like so:
private async Task<IManagedMqttClient> CreateClientAsync(string id)
{
const string address = "localhost";
var factory = new MqttFactory();
var client = factory.CreateManagedMqttClient();
var options = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId(id)
.WithTcpServer(address)
.Build())
.Build();
await client.StartAsync(options);
while (!client.IsConnected)
await Task.Delay(1);
return client;
}
Maybe a small sleep after subscribing may help because the managed client will not subscribe directly. It only adds the subscription to an internal storage and subscribes later on.
I am wondering if the code is hung due to while (!requestHandled). The thing is that this will block the worker thread and there is no chance that the handler will be called again. Please move this handling to a dedicated Task (Task.Run) etc.