stream-reactor icon indicating copy to clipboard operation
stream-reactor copied to clipboard

MQTT source connector skipping duplicate messages

Open GuillaumeMilani opened this issue 3 years ago • 2 comments

Hello, We are using the MQTT source connector to retrieve messages from our broker using QoS 1. We noticed that the messages having the dup flag set are skipped by the connector.

According the MQTT specification, this flag means that the message has been republished, but it doesn't mean that the client (here the connector) has already received the message. In our case, it leads to data loss as the duplicates are skipped even if the original message hasn't reached the connector.

https://github.com/lensesio/stream-reactor/blob/43dcd01c22e8054b3189b51bf614dbbd04e2289e/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/source/MqttManager.scala#L103

Is there any reason for which you decided to skip these messages ? We plan to remove the condition because we use QoS 1 precisely to avoid data loss.

GuillaumeMilani avatar Apr 30 '21 09:04 GuillaumeMilani

@GuillaumeMilani I don't recall why @stheppi ? I would suggest to add an extra config flag to allow the processing of duplicate messages.

andrewstevenson avatar Apr 30 '21 10:04 andrewstevenson

@GuillaumeMilani please see the linked change.

davidsloan avatar Jul 15 '22 14:07 davidsloan

This enhancement was released in 4.1.0.

There's now an extra configuration property:

connect.mqtt.process.duplicates=true

davidsloan avatar Mar 19 '23 14:03 davidsloan