hivemq-mqtt-client
hivemq-mqtt-client copied to clipboard
[MQTT5] Negative manual acknowlegement via Publish.acknowledge
Hi, is there any way to perform "negative acknowledgement" for incoming MQTT5 PUBLISH messages as a client subscriber? I want to send PUBACK with reason code 0x80, 0x83, 0x87, 0x90 or 0x97according to application-level decisions.
Hi @yufei-cai
Currently you can use the Mqtt5IncomingQos1Interceptor to set the reason code, reason string and user properties of the PUBACK message.
These interceptors can be registered via Mqtt5ClientBuilder.advancedConfig.
Quick example:
Mqtt5Client.builder()
...
.advancedConfig()
.interceptors()
.incomingQos1Interceptor((clientConfig, publish, pubAckBuilder) -> {
if (isRejected(publish)) {
pubAckBuilder.reasonCode(Mqtt5PubAckReasonCode.NOT_AUTHORIZED);
}
})
.applyInterceptors()
.applyAdvancedConfig()
.build();
Does this help you?
Yes, it helps, thanks. It would be more convenient to set the reason code with Mqtt5Publish#acknowledge() though.
It would be more convenient to set the reason code with
Mqtt5Publish#acknowledge()though.
@yufei-cai Yes, I agree. The problem here is that a user could have overlapping subscriptions and the publish messages will then be delivered to multiple callbacks. Which callback wins the decision for the reason code then? If you have an idea, please comment.
If the user have overlapping subscriptions with manual acknowledgement on then it is up to the user whether to acknowledge in one subscriber or not. I think it is acceptable for one acknowledge() to succeed and all others to fail with IllegalStateException.
It is actually a feature to have multiple callbacks/flows that can consume the same messages. The client accumulates the acknowledgements then and only after all have acknowledged the message the PUBACK/REC is sent to the broker.
But I think in the case of multiple callbacks the following would make sense:
- If at least one consumer did set a SUCCESS reason code (the default), then SUCCESS is used in the PUBACK/REC to the broker (This means that the callbacks can decide which of them consumes the message)
- If all consumers set the same error code, then this error code is used
- If all consumers set error codes but different error codes are used, then UNSPECIFIED_ERROR is used
- Reason strings are always concatenated (delimiter "; ")
- User properties are always concatenated
Interesting. I understood from the API doc of Mqtt5Publish#acknowledge() that this method would fail when called more than once on the same object. The reason code combination strategy @SgtSilvio proposed makes sense as a default, though my use case would prefer it if UNSPECIFIED_ERROR is used in case of any disagreement between consumers at all. The combination strategy looks like a good thing to configure in an interceptor.
There is another issue setting application-level reason code of PUBACK in Mqtt5IncomingQos1Interceptor and of PUBREC in Mqtt5IncomingQosInterceptor. The onPublish methods of the interceptors are called immediately on receiving a PUBLISH and are not supposed to block. However, my client application does not know the reason code at this point; It can only decide on the reason code (i. e., whether redelivery should be attempted) after some asynchronous processing of the PUBLISH message. It is not possible to wait on the asynchronous processing inside onPublish since it should not block. Mutating the Mqtt5PubAckBuilder in another thread seems a bad idea; the Mqtt5PubAckBuilder's implementation MqttPubAckBuilder does not come equipped with any kind of facility to handle concurrent modification.
The interceptors are not called immediately on receiving the publish message.
If you use manual acknowledgement, it is guaranteed that everything before the call to acknowledge happens before the call to the interceptor. So you can asynchronously process the message, determine and store the reason code, call acknowledge and then retrieve and set the reason code in the interceptor.
Still this is a workaround and setting the reason code directly via acknowledge is a good feature request.
Thanks for the info. This line in the Javadoc should probably be phrased a bit differently then. https://github.com/hivemq/hivemq-mqtt-client/blob/master/src/main/java/com/hivemq/client/mqtt/mqtt5/advanced/interceptor/qos1/Mqtt5IncomingQos1Interceptor.java#L33