cerk
cerk copied to clipboard
MQTT Port: Support for QoS 1/2
The MQTT Port currently only supports QoS 0 (At Most Once Delivery Guarantee). The reason is that the used MQTT library doesn't allow the delay of the PUBACK
message until the message is processed (see https://github.com/eclipse/paho.mqtt.c/issues/522).
We tried to patch the C
and rust
library (see https://github.com/eclipse/paho.mqtt.c/compare/master...ce-rust:feat/ack-after-processing and https://github.com/eclipse/paho.mqtt.rust/compare/master...ce-rust:feat/ack-after-processing-0.8.0).
This patch allowed us to block in the message callback till the message was successfully delivered over another port.
But then we run into another problem: The C
library uses the same threads and mutex for all the connections. In the case there are multiple MQTT Ports, the blocking of the message handler results in the blocking of the other connections => Dead Lock.
We then tried to fork the process before opening the connections. This way the different ports would not share the same threads and mutex. We tried to implement that in the port crate (see https://github.com/ce-rust/cerk/tree/feat/mqtt-port-subprocess) but unfortunately, forking the process in that stage of the router lifecycle results in unexpected behavior (see https://thorstenball.com/blog/2014/10/13/why-threads-cant-fork/).
Possible solutions:
- find MQTT library that supports the delay of
PUBACK
- write a MQTT library that supports the delay of
PUBACK
- implement a
Scheduler
component for the router that is not based on threads but on processes. (requires major changes incerk
, e.g.BrokerEvent
must be serializable.