Fix Subscribe QoS encode and UnsubAck decode.
- Fix Subscribe QoS encoding: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311
- Fix UnsubAck decoding: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022
This is a general purpose repo. Few if any of the Pekko team are MQTT experts. It would really help if
- the PR was written in plain English and not a set of links to barely decipherable specs
- the PR included test coverage
PR code does not even compile.
[error] /home/runner/work/pekko-connectors/pekko-connectors/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:458:74: value ReservedUnsubAck is not a member of object org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags
[error] extends ControlPacket(ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedUnsubAck)
[error] ^
[error] /home/runner/work/pekko-connectors/pekko-connectors/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:714:64: type mismatch;
[error] found : Int
[error] required: Byte
[error] packetBsb.putByte(topicFilterFlags.underlying.toByte >> 1)
@pjfanning
Compilation should have been fixed now.
For more context see: https://github.com/akka/alpakka/issues/2731 https://github.com/akka/alpakka/issues/2963
This was already reported by me years ago but never made it into pekko.
@pjfanning
Compilation should have been fixed now.
For more context see: akka/alpakka#2731 akka/alpakka#2963
This was already reported by me years ago but never made it into pekko.
For licensing reasons, we cannot accept code based on Alpakka commits. The code changes will only be accepted if they were written by the person submitting the PR. If you wrote the Alpakka PRs with no assistance from the Akka team, we may be able to accept them but we cannot accept anything based on code submitted to Akpakka otherwise.
@pjfanning Link: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022 shows the reserved bits for UnsubAck and PingReq:
UNSUBACK | Reserved | 0 | 0 | 0 | 0
PINGREQ | Reserved | 0 | 0 | 0 | 0
Both the same value. However the code as is, uses UnsubAckReserved for UnsubAck and ReservedGeneral for PingReq which doesn't make sense. When I created issue https://github.com/akka/alpakka/issues/2963, I pointed out exactly how and where in the code to fix this. So if there is any copyright involved, it would be mine and I am sharing it now with Pekko.
Link: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311 shows the lower two bits of the Subscribe message are to be used for QoS service: 0 0 0 0 0 0 q q The code however assumes the ControlPacketFlags for QoS are: 0 0 0 0 0 q q 0 which is correct for the Publish message, but not for the Subscribe message
For this one there is issue https://github.com/akka/alpakka/issues/2731, in which I explained how the bit shift is required for the Subscribe message. The Alpakka developers decided to fix this differently, so I am quite sure this fix is OK.
@pjfanning I see it failed on binary compatibility checks this time... Will fix this as well...
@pjfanning
You were right about the tests. They are failing. I know how to fix them, but it is not going to be as clean as I would like it to be. I am going to take some time to think about a proper solution. I suppose we can leave this PR open for now?
@pjfanning
PR should be good now. Instead of introducing new QoS flags for the Subscribe message, I thought it was best to make it work with the existing QoS flags (see below). In this way existing code shouldn't need any modifications. I suppose someone will want to have a critical look at this PR before it is merged. I totally understand if this is going to take a while.
val QoSAtMostOnceDelivery = ControlPacketFlags(0)
val QoSAtLeastOnceDelivery = ControlPacketFlags(1 << 1)
val QoSExactlyOnceDelivery = ControlPacketFlags(2 << 1)
val QoSReserved = ControlPacketFlags(3 << 1)
@sbmpost would it be feasible to add test coverage?
Also as a matter of interest, have you tried the mqtt v5 support that is in pekko-connectors 1.2.0? The release should be out this week but we have snapshot versions and the 1.2.0 candidate is in a staging area for testing.
@pjfanning
Test coverage could be nice indeed. For this to make sense we would need to test against a mqtt broker that is known to be strict on following the specification. In fact I found out about these bugs because I was connecting with IoT core from AWS, which didn't work as expected. To summarize: yes test coverage would be a good idea, but it requires a (independent) broker to test against. I am not sure if this is feasible, but I am open to suggestions of course.
@pjfanning
As for mqtt 5: For my work we rely on mqtt 3. Part of my job is to make sure everything runs stable and if there is a problem, I do a deep dive deep to find the root cause. Here is another example of this:
https://github.com/akka/alpakka/issues/2905
In conclusion: I learned quite a bit about mqtt 3 out of necessity but I am not so much involved with mqtt 5.
@He-Pin @mdedetrich @raboof what do you think? Should we merge this as is or do we need to spend time beefing up the mqtt testing? I suspect that there really isn't going to be someone who has the expertise and bandwidth to improve the test situation and that we are likely just going to have to accept our contributors' assertions.
@pjfanning
I think I would like to test with AWS IoT just once more, to be absolutely sure. I will post the results when I am done.
@pjfanning
I have tested with AWS IoT. The results are below.
Subscribing with QoSAtLeastOnceDelivery without the encoding fix gives "Upstream finished":
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-commandFlow] Element: Right(Subscribe(PacketId(1),Vector((22574/lwt,ControlPacketFlags(2)), (13182/lwt,ControlPacketFlags(2)), (67782/lwt,ControlPacketFlags(2)), (13185/lwt,ControlPacketFlags(2)), (114855/lwt,ControlPacketFlags(2)), (13187/lwt,ControlPacketFlags(2)))))"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-events] Upstream finished."}
With QoS encoding fix we get a "SubAck":
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-commandFlow] Element: Right(Subscribe(PacketId(1),Vector((22574/lwt,ControlPacketFlags(2)), (13182/lwt,ControlPacketFlags(2)), (67782/lwt,ControlPacketFlags(2)), (13185/lwt,ControlPacketFlags(2)), (114855/lwt,ControlPacketFlags(2)), (13187/lwt,ControlPacketFlags(2)))))"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-events] Element: Right(SubAck(PacketId(1),Vector(ControlPacketFlags(1), ControlPacketFlags(1), ControlPacketFlags(1), ControlPacketFlags(1), ControlPacketFlags(1), ControlPacketFlags(1))))"}
And for an Unsubscribe without the decode fix we get "UnknownPacketType":
[info] {"level":"DEBUG","origin":"MqttSource","sc":{},"title":"MQTT: sending unsubscribe command: 22574/lwt"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-commandFlow] Element: Right(Unsubscribe(PacketId(1),Vector(22574/lwt)))"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-events] Element: Left(UnknownPacketType(ControlPacketType(11),ControlPacketFlags(0)))"}
However after fixing the decode we see a proper "UnsubAck":
[info] {"level":"DEBUG","origin":"MqttSource","sc":{},"title":"MQTT: sending unsubscribe command: 22574/lwt"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-commandFlow] Element: Right(Unsubscribe(PacketId(1),Vector(22574/lwt)))"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-events] Element: Right(UnsubAck(PacketId(1)))"}
Ah, and about the AI review... that review is painfully incorrect:
The QoS encoding appears to be shifting in the wrong direction. According to MQTT v3.1.1 specification section 3.8.3.1, QoS values (0, 1, 2) should be encoded in bits 1-0 of the byte. Right-shifting by 1 would place QoS values in bits 2-1 instead
The AI doesn't take into account that the Pekko QoS constants have been defined as (0, 2, 4). Also it thinks that shifting to the right means that bits are moving to the left.
If the encoding is right-shifting by 1, then decoding should also right-shift by 1 to extract the QoS from the correct bit positions. The shift directions in encode and decode operations should be inverse operations.
Euhm... what? Since when is the inverse of a right-shift another right-shift? The lower 2 bits of a Subscribe message encode the QoS which is one of the values (0, 1, 2). Upon receiving such a message, shifting those bits to the left gives us back the Pekko QoS constants (0, 2, 4).
@pjfanning
Test coverage could be nice indeed. For this to make sense we would need to test against a mqtt broker that is known to be strict on following the specification. In fact I found out about these bugs because I was connecting with IoT core from AWS, which didn't work as expected. To summarize: yes test coverage would be a good idea, but it requires a (independent) broker to test against. I am not sure if this is feasible, but I am open to suggestions of course.
So one option we can do is that like with the S3 connector, we can use the ASF infra provided AWS account and then we can designate some tests to be run against AWS IoT (these tests would only run on main and not in PR's but its better than nothing).
@pjfanning thoughts?
@mdedetrich I'm happy enough with testing against AWS. This is not a requirement for this PR though - that is, I wouldn't expect @sbmpost to write the tests because it would be easier for someone who had access to the AWS account that we test with to be able to set up any resources that we need for testing.
@mdedetrich I'm happy enough with testing against AWS. This is not a requirement for this PR though - that is, I wouldn't expect @sbmpost to write the tests because it would be easier for someone who had access to the AWS account that we test with to be able to set up any resources that we need for testing.
Oh agreed, @sbmpost should make sure that these PR changes work as expected on the AWS account that he is testing against and then we can split out the test suites in the same way its done with s3 so that we can run extra tests against a provided AWS test account from ASF infra.
I have made an issue for this