hivemq-mqtt-client
hivemq-mqtt-client copied to clipboard
incoming publishes are acknowledged when publish flow dies
It seems that when the publish flow dies (for whatever reason) the HiveMQ Client is sending PUBACKs for subsequent messages. That's wrong. The Client should not send PUBACKs for Publishes when the publish flow dies. The flow is configured for manual acknowledgment.
The Error which is logged is the following:
No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=398738e9-d817-458e-b35d-024837e3f0ed/from-smart-acquisition-device/error-codes/event, payload=151byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=13454, dup=false, topicAlias=0, subscriptionIdentifiers=\[\]}.
In my opinion it's wrong to simply log a WARN message when there is no publish flow for a PUBLISH. It should throw an exception and do NOT send a PUBACK:
https://github.com/hivemq/hivemq-mqtt-client/blob/18fc441855d9acb775f739498cc32045d74e5a69/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishService.java#L104
@CallByThread("Netty EventLoop")
private void onPublish(final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
incomingPublishFlows.findMatching(publishWithFlows);
if (publishWithFlows.isEmpty()) {
LOGGER.warn("No publish flow registered for {}.", publishWithFlows.publish);
}
drain();
for (Handle<MqttIncomingPublishFlow> h = publishWithFlows.getFirst(); h != null; h = h.getNext()) {
if (h.getElement().reference() == 1) {
referencedFlowCount++;
}
}
emit(publishWithFlows);
}
Hi @smaugs Sorry for the delayed response.
when the publish flow dies (for whatever reason)
You probably refer to the reactive API and that the flow is cancelled inside your application, are you? Can you give reasons why the flow should die other than a bug and, more importantly, how would you recover from it?
It should throw an exception
If the flow "died", then there is no callback where an exception can be delivered. You can also register a flow for all remaining messages that can not be delivered to any other flow.
Hi @SgtSilvio
no reason to excuse :)
when the publish flow dies (for whatever reason)
You probably refer to the reactive API and that the flow is cancelled inside your application, are you?
Yes.
Can you give reasons why the flow should die other than a bug
There are multiple reasons why a flow can die (Out of Memory, Bug, ...) - but that's not the point. We are subscribing with QoS1 and explicitely requesting manual acknowledgement of every single message which is arriving. But there is a default action to auto acknowledge messages when no flow exists. In my opinion the default action for PUBs with QoS > 0 should be to not send a PUBACK for them.
When I use MQTT I am interested in some sort of messages and I'm subscribing for them with a specific QoS. When I want to make sure I get all messages I subscribe with QoS > 0. Loosing any message for whatever reason is not an option.
and, more importantly, how would you recover from it?
I would not try to recover from it. This is an error which is not easy recoverable. It has to be detectable somehow and maybe some sort of actions in case a flow dies (stop service, do nothing, ...)
When this happens it is a problem - but the much bigger problem is to loose messages (QoS > 0).
It should throw an exception
If the flow "died", then there is no callback where an exception can be delivered. You can also register a flow for all remaining messages that can not be delivered to any other flow.
That's true - but it does not matter. The exception does not have to be processed in the flow. The problem has to be detectable in some way - but it has not to be repaired.
There are two types of security or reliability involved here. Data Security/Reliability and Operational Security/Reliability. Both are important. But if a user of an MQTT Client Library is explicitely requesting manual acknowledgement the reliability of the data obviously ist more important to that user.
Can we somehow agree on the idea of how it should be or do you have another opinion? The solution is a completely different topic.
Hi @smaugs
I may have a solution to your problem that is already possible: You can add a "fallback" flow, that consumes all messages for which no other flow exists. If you enable manual acknowledgement for this fallback flow, you can prevent sending an acknowledgement. For example:
client.toRx().publishes(MqttGlobalPublishFilter.REMAINING, true).subscribe(publish -> {
log.error("Received a message with or which no flow exists");
// do not call publish.acknowledge(), instead disconnect the client
client.toRx().disconnect();
}, throwable -> {
log.error("Fallback flow errored");
});
client.toRx().subscribePublishesWith().topicFilter("test").manualAcknowledgement(true).applySubscribe()
.subscribe(publish -> {
// handle normal publish, if this flow "dies", then the remaining flow is called
});
}
Please comment if this is a viable solution for you.
We should probably make it configurable whether the client should disconnect if it receives a message without a flow. Added this issue to the 2.0 milestone if we want to make this the default behavior.
Hi @SgtSilvio,
sorry for responding that late. Our team is testing the suggested workaround - thank you!
regarding the configurable behaviour: I like the idea very much. Do you have any idea when 2.0 will be released?
Hi @SgtSilvio,
we tried your suggested workaround and it seems to work - but there is another issue:
We use one Client with multiple subscriptions (and therefore multiple flows). So we decided not to disconnect but log an error in the fallback flow. So the remaining flows should proceed their messages.
We tested it in our application and interestingly if one flow dies (and the fallback flow is triggered) no more messages are processed. At first we thought it has something to do with the "message in flight" limit of the broker - but this limit is much higher. In fact it happens at the first message of another flow after the fallback flow was triggered.
Since this issue hasn't had an update in a couple years, I'm going to close it out. @smaugs if that remaining issue you identified still exists, please feel free to open another issue. We'd be happy to help out!