stream-reactor
stream-reactor copied to clipboard
MQTT source connector skipping duplicate messages
Hello,
We are using the MQTT source connector to retrieve messages from our broker using QoS 1. We noticed that the messages having the dup
flag set are skipped by the connector.
According the MQTT specification, this flag means that the message has been republished, but it doesn't mean that the client (here the connector) has already received the message. In our case, it leads to data loss as the duplicates are skipped even if the original message hasn't reached the connector.
https://github.com/lensesio/stream-reactor/blob/43dcd01c22e8054b3189b51bf614dbbd04e2289e/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/source/MqttManager.scala#L103
Is there any reason for which you decided to skip these messages ? We plan to remove the condition because we use QoS 1 precisely to avoid data loss.
@GuillaumeMilani I don't recall why @stheppi ? I would suggest to add an extra config flag to allow the processing of duplicate messages.
@GuillaumeMilani please see the linked change.
This enhancement was released in 4.1.0.
There's now an extra configuration property:
connect.mqtt.process.duplicates=true