alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

MQTT streaming: PUBREL event doesn't get published to server event stream when publishing two QoS2 messages in a row

Open jcroig opened this issue 4 years ago • 27 comments

References #2188

Comments:

  1. Although this fix changes a little bit the guarantees of the client event stream message order (now they can potentially be in a different order than the client issued them) it looks like just qos1 and qos2 acknowledge messages might be delivered out of order for the most part, which, in a sense is what we look for so they can complete PUBLISH message lifecycle without being blocked by others.

  2. The rest of the messages, they're all sent to the same actor (server connector) to be processed and delivered downstream afterwards because akka guarantees order delivery on actor messages they should be pushed downstream in order as well.

  3. On top of that most of those messages have an intrinsic order given by the MQTT protocol preventing that two messages that depend each other in order are processed at the same time in the same stage.

  4. Only two publish messages could be independent and potentially get unordered during this stage but given comment 1 it doesn't look like it will happen.

jcroig avatar Mar 11 '20 13:03 jcroig

cc @huntc

ennru avatar Mar 16 '20 07:03 ennru

Thanks for the fix - can we please first have a test that fails for the existing code?

huntc avatar Mar 16 '20 09:03 huntc

I included a reference to a project I created to reproduce the issue:

https://github.com/jcroig/alpakka-mqtt-issue/tree/master

Tried to create a unit test but could not reproduce it using alpakka streaming client side since it will never send two PUBLISH QoS2 messages in a row to the same topic before the first get completed, which is what we need to reproduce the issue.

jcroig avatar Mar 16 '20 11:03 jcroig

Tried to create a unit test but could not reproduce it using alpakka streaming client side since it will never send two PUBLISH QoS2 messages in a row to the same topic before the first get completed, which is what we need to reproduce the issue.

...and nor should it... we cannot proceed to process messages for a given topic without them being complete (this is as per the spec). I'm not convinced that we should be handling this problem then as it represents poor client behaviour... unless there's something I'm missing. Thoughts? What client is being used where you see the problem?

huntc avatar Mar 18 '20 23:03 huntc

Hi @huntc,

Thanks for your answer.

I've seen this problem with mosquitto broker using it as bridge:

http://www.steves-internet-guide.com/mosquitto-bridge-configuration/

I also can reproduce that easily using Paho, which is what I used to reproduce it in the project example I linked from the issue:

https://github.com/jcroig/alpakka-mqtt-issue

As a clarification, the problem is not that messages doesn't get process in parallel, I guess it is fine they are completed in order but what happens is that when two Qos2 messages are sent in a row the server will complete the first part of the acknowledge process (PubRec) but then it will always fail to complete the second part (PubComp) so the client will keep sending PubRel all the time and this message will never be processed.

I might be missing something, but haven't seen anywhere in the MQTT spec that this behaviour is not legit, I mean, sending two publish Qos2 messages with different packet ids to the same topic without waiting the first acknowledge process to be completed.

jcroig avatar Mar 19 '20 09:03 jcroig

I might be missing something, but haven't seen anywhere in the MQTT spec that this behaviour is not legit, I mean, sending two publish Qos2 messages with different packet ids to the same topic without waiting the first acknowledge process to be completed.

Please see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/csprd02/mqtt-v3.1.1-csprd02.html#_Toc385349841 - "non-normative comment":

" If both Client and Server make sure that no more than one message is “in-flight” at any one time (by not sending a message until its predecessor has been acknowledged), then no QoS 1 message will be received after any later one - for example a subscriber might receive them in the order 1,2,3,3,4 but not 1,2,3,2,3,4. Setting an in-flight window of 1 also means that order will be preserved even if the publisher sends a sequence of messages with different QoS levels on the same topic."

That's what we're doing here... we are actually enforcing that no more than one message for a given topic can be in flight. A message isn't "done" until it is fully received, so to allow multiple in flight and then having, say, the first one be rejected at the PUBREL state while the second has received a PUBREC could be dangerous.

huntc avatar Mar 20 '20 03:03 huntc

Thanks for you answer @huntc

Having in mind the mandatory rules described in section 4.6:

A Client MUST follow these rules when implementing the protocol flows defined elsewhere in this chapter:

When it re-sends any PUBLISH packets, it MUST re-send them in the order in which the original PUBLISH packets were sent (this applies to QoS 1 and QoS 2 messages) [MQTT-4.6.0-1]
It MUST send PUBACK packets in the order in which the corresponding PUBLISH packets were received (QoS 1 messages) [MQTT-4.6.0-2]
It MUST send PUBREC packets in the order in which the corresponding PUBLISH packets were received (QoS 2 messages) [MQTT-4.6.0-3]
It MUST send PUBREL packets in the order in which the corresponding PUBREC packets were received (QoS 2 messages) [MQTT-4.6.0-4]
 

A Server MUST by default treat each Topic as an "Ordered Topic". It MAY provide an administrative or other mechanism to allow one or more Topics to be treated as an "Unordered Topic" [MQTT-4.6.0-5].

 

When a Server processes a message that has been published to an Ordered Topic, it MUST follow the rules listed above when delivering messages to each of its subscribers. In addition it MUST send PUBLISH packets to consumers (for the same Topic and QoS) in the order that they were received from any given Client 

My thoughs:

  1. As far as I see it, the in-flight message is a "recommendation" so that if both client and server follow it then all rules are guaranteed for free, but not a rule in itself that the client must fulfil. Client can fulfil the mandatory rules while sending multiple PUBLISH messages one after another. If I look at mosquitto bridge broker logs it is indeed resending PUBLISH and returning PUBREL in order, so if we are purists I do not think we can say this is a client bug. As I mentioned before, I can reproduce this with both mosquitto (using it as a bridge) and paho which are very well known and used clients so probably this issue will happen frequently when using this module.

That's what we're doing here... we are actually enforcing that no more than one message for a given topic can be in flight. A message isn't "done" until it is fully received, so to allow multiple in flight and then having, say, the first one be rejected at the PUBREL state while the second has received a PUBREC could be dangerous.

Alpakka server is indeed currently serializing PUBLISH messages per topic, which I believe is very good because it simplify things and will enforce the mandatory rules. That means that it will not start processing the second PUBLISH message to this topic (hence sending any PUBREC response) until the first PUBLISH message it's done (had completed the whole acknowledge process) but the issue is that the first PUBLISH message will never be done because the PUBREL message coming from the client will wait until second PUBLISH message gets completed instead of process it to complete the first PUBLISH message.

In other works, I'm not saying it should process messages to the same topic in parallel, but in case they come one after another (which as said doesn't look like it is not legit by itself) we should be able to complete them in order, and for that, we need to give more priority to first PUBLISH message PUBREL's response than second PUBLISH message, otherwise first PUBLISH message will be broken at this stage and never completed which is what it happens with current implementation.

The change I propose in the PR will not hold the processing of PUBREL after second PUBLISH so first PUBLISH can be completed and second PUBLISH will then start to be processed sequentially afterwards. Note, though, that the change does not affect the sequentiality of PUBLISH messages by topic as first in will be processed first and the second will be queued, waiting to be processed until first message completes, as it is now. I agree there might be some other implications on the ordering of other message types, nut hopefully they don't violate any spec constraint (check https://github.com/akka/alpakka/pull/2189#issue-386683712)

jcroig avatar Mar 20 '20 11:03 jcroig

The server design is such that only one publish for a topic can be processed at a time. Allowing more is going to break ordering as there will need to be multiple actors handling the publications to the same topic.

There could still be something I’m missing here. Also, while I appreciate the maturity of Paho et al, it doesn’t mean that they are bug free. ;-)

We need a spec though, so I can confirm my understanding of what the problem is. There should be some existing specs to illustrate how packets can be sent without having to use a full blown client. Let me know if you can’t find one though.

Thanks for the efforts and for being patient with me.

huntc avatar Mar 20 '20 11:03 huntc

The server design is such that only one publish for a topic can be processed at a time. Allowing more is going to break ordering as there will need to be multiple actors handling the publications to the same topic.

That's correct but that won't change because of this PR change. This is not allowing to process more than one message to the same topic at a time. Second message will start to be processed after first is completed.

There could still be something I’m missing here. Also, while I appreciate the maturity of Paho et al, it doesn’t mean that they are bug free. ;-)

I do agree and I don't mean they're bug free, just that they are not violating any spec rule hence I don't think we should consider their behaviour as a bug.

We need a spec though, so I can confirm my understanding of what the problem is. There should be some existing specs to illustrate how packets can be sent without having to use a full blown client. Let me know if you can’t find one though.

The only spec I found is the one you point me out and as I said it is not violated on the client side at all:

  1. When it re-sends any PUBLISH packets, it MUST re-send them in the order in which the original PUBLISH packets were sent (this applies to QoS 1 and QoS 2 messages) [MQTT-4.6.0-1]

  2. It MUST send PUBREL packets in the order in which the corresponding PUBREC packets were received (QoS 2 messages) [MQTT-4.6.0-4]

Unless you are taking non-normative comment as a rule, hence, considering that the client must always fulfil it. IMHO I don't think this is a rule but a simplification that implies the mandatory rules compliance, but not the only way to do it. I think that the server should not assume the client is always going to follow this simplification since it can fulfil the spec not doing it either.

Thanks for the efforts and for being patient with me.

Pleasure, this issue is a little bit complex and not easy to discuss so thanks for your patience with me as well.

jcroig avatar Mar 20 '20 12:03 jcroig

The only spec I found is the one you point me out and as I said it is not violated on the client side at all

Sorry, by spec, I mean test specification. Here's an example of a client specification that is sending commands directly to the client without using an actual server - a similar approach could be used to reproduce the test case you have here. Note that server tests are further down the page. However, you can write the test we need here for the client or the server as either can receive PUBLISH with QoS 2.

https://github.com/akka/alpakka/blob/master/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala#L717

huntc avatar Mar 20 '20 13:03 huntc

I just added two tests to reproduce the issue both at client and server sides.

Obviously they won't fail with the "fix" I provided along in the same PR 😅 . Just do not apply the changes on https://github.com/akka/alpakka/pull/2189/files#diff-761cc8b468c1e9e7e7eafc115a516716 file and run the tests, they will fail.

Hopefully they will help to better communicate the issue than I do 😄. Should I need to provide more details, please, let me know.

jcroig avatar Mar 20 '20 16:03 jcroig

Thanks for the tests - we probably only need the one i.e. either the client or the server - perhaps just the client. Given that there's no difference in behaviour between the client handling a publish vs the server, then we should just have one in order to keep the surface area of code to maintain to a minimum.

The tests confirm my position that no more than one publish should be sent at a time. However, I think it makes sense to, and as the spec suggests, have a mode of operation expressed as a non-default setting where the consumer can handle multiple publications to the same topic. Thoughts?

huntc avatar Mar 21 '20 01:03 huntc

Thanks for the tests - we probably only need the one i.e. either the client or the server - perhaps just the client. Given that there's no difference in behaviour between the client handling a publish vs the server, then we should just have one in order to keep the surface area of code to maintain to a minimum.

Cool, I'll remove server side test

The tests confirm my position that no more than one publish should be sent at a time. However, I think it makes sense to, and as the spec suggests, have a mode of operation expressed as a non-default setting where the consumer can handle multiple publications to the same topic. Thoughts?

I think MQTT spec doesn't imply sending two publish messages at once is wrong by itself (as long as mandatory rules are fulfilled) so I'd say the default operation should not be failing in such case, many other existing MQTT brokers (I tested mosquitto and hivemq) do not fail either so it looks like they all interpret the MQTT spec in the same way (although I agree that doesn’t mean they all are bug free). On top of that the resulting outcome of the issue is a QoS 2 message loss since the broker responds back with PURBREC after which the client is never going to resend the original PUBLISH anymore and will keep sending PUBREL without getting any answer from the server.

jcroig avatar Mar 21 '20 15:03 jcroig

I think MQTT spec doesn't imply sending two publish messages at once is wrong by itself (as long as mandatory rules are fulfilled)

The potential is for there to be an out of order scenario. That’s intolerable from a spec perspective unless you specifically want it.

Do you know for sure that mosquito can guarantee that your two publishes to the same topic with QoS will be processed in the same order?

huntc avatar Mar 21 '20 22:03 huntc

Do you know for sure that mosquito can guarantee that your two publishes to the same topic with QoS will be processed in the same order?

Can not say for sure, but in all tests I conducted (quite a lot of them), it does, yes.

jcroig avatar Mar 22 '20 11:03 jcroig

How many publish messages should we potentially allow to queue up for a given topic?

huntc avatar Mar 22 '20 20:03 huntc

settings.eventParallelism ?

I think that more important than the number of maximum PUBLISH messages allowed to be queued for a given topic, it is not to end up with a broken one. If a QoS 2 message is dropped by the broker without sending any PUBREC response, then the client will republish the PUBLISH message at some point, as if the original one had been lost previously (a scenario which is contemplated in the spec). This way this message delivery will be completed. What is annoying from the current situation is that the server does responds with PUBREC but then it fails to process PUBREL which leaves this message delivery broken forever.

jcroig avatar Mar 23 '20 09:03 jcroig

I think that more important than the number of maximum PUBLISH messages allowed to be queued for a given topic, it is not to end up with a broken one.

Sorry for the delay. It would be good to have the test designed specifically for this scenario then, right? I wonder if it is right now...

huntc avatar Mar 27 '20 23:03 huntc

Thanks for your answer.

In theory the test I already created test that - the first of two PUBLISH is completed properly, that is, client's PUBREL is processed after server's initial PUBREC which is what it fails and leaves the message broken.

jcroig avatar Mar 28 '20 16:03 jcroig

I think then that the problem is within the consumer actor. At the moment, we are just letting subsequent publish messages to an active topic pass through.

huntc avatar Mar 28 '20 21:03 huntc

I don't think this would be an issue as long as it is guaranteed that the first PUBLISH's PUBREL response is processed and pushed downstream in the command stream before the second PUBLISH gets processed.

For QoS 2 there's subsequent client messages that should be process with more priority than the second PUBLISH, that is, PUBREL of the first PUBLISH. Since the command processing stage is mapAsync, and this keeps original order, those subsequent messages will not be pushed downstream before the second PUBLISH gets processed, so here's where de "deadlock" happens. First PUBLISH needs second PUBLISH to be processed before it can process PUBREL coming from the client and get completed but second PUBLISH needs first PUBLISH to be completed in order to get processed.

For QoS 0 and QoS 1 not an issue though.

I am sorry if I am repeating myself here, I just want to make sure we are on the same page.

The trivial fix was using mapAsyncUnordered since that way PUBREL will be processed and pushed downstream as soon as it arrives without waiting for the second PUBLISH to be processed. I understand that might have broader implications though, hence my first comment:

https://github.com/akka/alpakka/pull/2189#issue-386683712

Another potential fix for that, would be as I understand you suggest, rejecting the second publish message to the same topic. Note that in this case it can potentially lead to many Qos 0 messages to be lost since all of them will be rejected (when they are sent to a topic that as a pending message to be completed). Not an issue for QoS 1 or 2 because the client will resend them if no server response.

What do you think?

jcroig avatar Mar 29 '20 11:03 jcroig

Another potential fix for that, would be as I understand you suggest, rejecting the second publish message to the same topic. Note that in this case it can potentially lead to many Qos 0 messages to be lost since all of them will be rejected (when they are sent to a topic that as a pending message to be completed).

QoS 0 is handled differently to the other QoS message types. Shouldn’t be an issue.

I need to spend time on this issue and sketch out the flows to illustrate what I mean.

huntc avatar Mar 29 '20 20:03 huntc

You are right, QoS 0 is always pushed downstream right away it is not queue in any consumer topics buffer so it doesn't get block by any previous message.

What then? Shall we just reject a PUBLISH QoS 1, 2 message if there's any other message being processed at this topic?

What do you think about this PR fix along with what I comment at https://github.com/akka/alpakka/pull/2189#issue-386683712 ?

jcroig avatar Mar 30 '20 07:03 jcroig

Hi @huntc,

Could you check my last comment? Any update on this?

Thanks

jcroig avatar Apr 16 '20 09:04 jcroig

Could you check my last comment? Any update on this?

Really sorry for the delay. I'm not a fan of the fix proposed by the PR. Unfortunately, I'm a bit short on time at the moment, but I do intend to get to this.

huntc avatar Apr 16 '20 10:04 huntc

Sure, no worries, let me know if I can help.

Thank you very much!

jcroig avatar Apr 17 '20 08:04 jcroig

This has been stale for a very long time now. I'm inclined to close it.

ennru avatar Sep 01 '22 19:09 ennru

Closing for inactivity.

ennru avatar Dec 09 '22 10:12 ennru