MQTTnet icon indicating copy to clipboard operation
MQTTnet copied to clipboard

Abort Message with QOS=1 or QOS=2 that has not been delivered to server

Open hannasm opened this issue 2 years ago • 5 comments

Describe the feature request

Publication of a message from an mqtt client to a server may be delayed if either the server is offline, or the network is having issues , and the message type is considered QOS=1 (at least once) or QOS=2 (exactly once).

When a QOS=1 or QOS=2 message is published, it will sit in the client queue for an indefinite amount of time while waiting for delivery to the server. Under some circumstances an application would benefit from being able to determine that messages are in this state, and potentially remove them from the queue again and take alternative corrective actions. This would mostly be equivalent to what takes place when a message is removed due to overflow.

Some scenarios where a client-side abort might be used include:

  • interactive application where the user requests cancellation
  • timeouts
  • error recovery or other system events

Given these abort-able scenarios, there is a compelling place to use a client-side abort, and also several caveats to bare in mind.

Consider any long lived application, publishing many messages with a single mqtt client. Any published message during this long lived application may experience an abort-able scenario. This means the message is already in the mqtt client publish queue, but delivery to the server has not yet succeeded. If no abort happens, the application will still be online, and that message may remain in the mqtt client publish queue for a prolonged period of time, and then also eventually be delivered even though it was aborted by the application. Performing a client-side abort would enable these messages to be removed from the publish queue without risk of them being delivered to the server at some unpredictable later point in time. This would mostly be equivalent to what takes place when a message is removed due to overflow and may help in dealing with overflow in some ways by removing additional messages from the queue early.

Some caveats to bare in mind:

  • Once a message has been successfully delivered to the server, client side abort is never going to change what the server does with it
  • In the QOS=1 scenario it may be possible that the message has been delivered to the broker unbeknownst to the client, in this case abort would prevent further delivery attempts, but it would not change what the server does with the delivered message
  • When thinking in terms of micro seconds, there is a race condition between message abort and any ack from delivery at the broker. If a message abort is requested, there is no expectation that it immediately halt delivery of an existing handshake with the server. The expectation is always in terms of mitigating against prolonged delivery delays.
  • As a consequence of the overflow strategy, a message may have been aborted by the mqtt client before the application ever requests an abort. In such case the abort would just be a NOOP.

An ideal client-side abort would also be able to provide feedback about whether the message was definitely delivered, definitely not delivered, or that it may have been delivered but the client never received an ack.

In this case where delivery may have already occurred but we dont think so, it seems reasonable that the mqtt client internally only makes a best effort to ensure the message is not delivered after a prolonged delay. It doesn't need to necesarrily interrupt an ongoing handshake with the server or otherwise prioritize an abort when dealing with race conditions at the microsecond level.

Which project is your feature request related to?

  • Client
  • ManagedClient

Describe the solution you'd like

var managedClient = /* Managed MQTT Client */
var message = /* build message here */
var abortablePublication = await managedClient.AbortableEnquqeueAsync(message);
var abortResult = await abortablePublication.AbortAsync();

The api for managed client could either define a new method or possibly even modify the existing method to return something along the lines of a Task<ManagedMqttClientPublishResult> instead of Task The api for unmanaged client could potentially expose the AbortAsync() method through the existing MqttClientPublishResult class.

The abort method would have a signature along the lines of:

enum AbortStatus {
  Delivered,
  NotDelivered,
  WontDeliverAgain
}

public interface IAbortResult {
  AbortStatus Status { get; }
}

public Task<IAbortResult> AbortAsync(CancellationToken cancellationToken = default);

depending on implementation details it may be that the cancellation token argument is unnecesarry but it seems fairly standard to include it

Describe alternatives you've considered

It might be possible to implement logic on the receiving end of a subscription to detect abort-able scenarios as described above, but this can also become prohibitively difficult depending on the conditions that need to be met.

There may be ways to create more fine-grained mqtt clients or dispose of existing mqtt clients when abort is required, however these would always offer poor trade-offs for the library consumer.

hannasm avatar May 19 '23 04:05 hannasm

I'd also love to have such functionality.

Note that, for the managed client, messages with QoS 0 can also be kept for an indefinite amount of time, since the managed client only processes its message queue while being connected to the broker. Additionally, if a message with QoS 0 is enqueued after a message with QoS 1 or 2, the QoS 0 message is kept at least until the QoS 1/2 message has been processed.

I want to propose another solution. Before publishing the message, the managed client could invoke some InterceptingProcessMessageAsync event. The event args contain the message and a flag ShouldPublish. The application can now set the ShouldPublish to false if it wants to stop the message from being published.

(The same pattern is already used in the MqttServer class.)

var managedClient = /* Managed MQTT Client */
managedClient.InterceptingProcessMessageAsync += (args) =>
{
    if (/* some condition by the application, e.g., PublishHasBeenCanceled(args.Message) */)
    {
        args.ShouldPublish = false;
    }
}

var message = /* build message here */
await managedClient.EnquqeueAsync(message);

This solution enables some more use-cases. For example, applications may use the event to log whenever the managed client tries to send a message via the internal client.

Additional, the change is only minimally invasive, since once the event has been added to the interface, only the behavior of the TryPublishQueuedMessageAsync function in the managed client needs to be changed.

dagophil avatar Jul 25 '23 15:07 dagophil

I like this alternative design. It should be possible to cover every use case i am concerned with using InterceptingProcessMessageAsync. It probably only take a few lines of code to implement as well.

hannasm avatar Jul 25 '23 16:07 hannasm

I'd love to make a PR, but I think one of the maintainers should first check whether the approach is okay. Do you know how we can get their attention?

dagophil avatar Aug 28 '23 15:08 dagophil

Hello @chkr1011 - do you have any feedback on this discussion before getting a PR underway?

hannasm avatar Aug 28 '23 16:08 hannasm

I have submitted a pull request for this feature.

#1915

hannasm avatar Jan 25 '24 23:01 hannasm