MQTT consumer plugin disconnects frequently and can not reconnect successfully
Relevant telegraf.conf
[[inputs.mqtt_consumer]]
topics = [
"myTenant/#",
]
data_format= "value"
data_type= "string"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "+/devices/+/+"
measurement = "measurement/_/_/_"
tags = "tenant/_/device/field"
[[processors.pivot]]
order = 1
tag_key = "field"
value_key = "value"
[[processors.converter]]
order = 2
tagpass = { topic = ["myTenant*"] }
[processors.converter.fields]
float = ["temperature", "humidity"]
integer = ["pressure", "rssi"]
Logs from Telegraf
2024-12-08T22:49:30Z D! [inputs.mqtt_consumer] [pinger] ping check60.000303436
2024-12-08T22:49:30Z D! [inputs.mqtt_consumer] [pinger] keepalive sending ping
2024-12-08T22:49:34Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
2024-12-08T22:49:35Z D! [inputs.mqtt_consumer] [pinger] ping check4.9995263
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [pinger] ping check10.00021076
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [pinger] pingresp not received, disconnecting
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [client] internalConnLost called
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [client] stopCommsWorkers called
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [client] internalConnLost waiting on workers
2024-12-08T22:49:40Z D! [inputs.mqtt_consumer] [client] stopCommsWorkers waiting for workers
2024-12-08T22:49:44Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] Connecting [tcp://127.0.0.1:1883]
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] Connect()
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [store] memorystore initialized
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] about to write new connect msg
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] socket connected to broker
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] Using MQTT 3.1.1 protocol
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] connect started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] received connack
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] startCommsWorkers called
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] client is connected/reconnected
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] incoming started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] outgoing started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startComms started
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] startCommsWorkers done
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [store] memorystore wiped
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] exit startClient
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [pinger] keepalive starting
2024-12-08T22:49:50Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: inboundFromStore complete
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] enter SubscribeMultiple
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] sending subscribe message, topics:[topic1/# topic2/#]
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] exit SubscribeMultiple
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] obound priority msg to write, type*packets.SubscribePacket
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] outgoing waiting for an outbound message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [store] memorystore del: message1not found
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received suback, id:1
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: granted qoss[0 0]
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] logic waiting for msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncoming Received Message
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: got msg on ibound
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [net] startIncomingComms: received publish, msgId:0
2024-12-08T22:49:54Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
2024-12-08T22:49:55Z D! [inputs.mqtt_consumer] [pinger] ping check5.000816128
2024-12-08T22:50:00Z D! [inputs.mqtt_consumer] [pinger] ping check10.000605322
2024-12-08T22:50:04Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
etc (goes on forever like this)
System info
Telegraf 1.32.3 (git: HEAD@2fd5bf4f) on Debian 12 (bookworm)
Docker
not using docker
Steps to reproduce
- start mosquitto, influxdb and telegraf
- publish some data
Expected behavior
- Data is read and transformed by telegraf and stored in influxdb
- if connection to mosquitto is lost, telegraf reconnects and continues to process data
Actual behavior
- Data is read and transformed by telegraf and stored in influxdb
- if connection to mosquitto is lost, telegraf reconnects successfully but fails to receive any data (including ping resp)
I can confirm the described behavior (at least what I understood is meant). Here is a post I published in the Influx community forum, maybe it can add some insights.
I’m using the inputs.mqtt_consumer plugin to retrieve some data from a LoRaWAN networkserver (TTN). I noticed recently that no more data where written into my InfluxDB.
I restarted telegraf and data were written again.
I looked into telegraf’s log-files and found error entries for all concerned mqtt_consumer instances, e.g.
E! [inputs.mqtt_consumer::ttn_consumer_ow] Error in plugin: connection lost: read tcp 172.19.0.3:42600->52.212.223.226:1883: read: connection reset by peer
E! [inputs.mqtt_consumer::ttn_consumer_ow] Error in plugin: network Error : read tcp 172.19.0.3:33402->63.34.215.128:1883: i/o timeout
I guess it can happen that the connection between the server running telegraf and the mqtt-server is temporarily interrupted. But why does that cause the telgraf input plugins to quit working?
Here are some more observations:
- telegraf is running in a docker container
- other input- and output plugins of the same telegraf instance continued working without issue
- As you can see in the example above it seemed that the IP address for the configured mqtt-server changed. Maybe that’s just a normal process (e.g. for something like load-balancing?). Maybe that causes the telegraf issue? On the other hand there are also error entries in the log that have the same IP address for both error messages (read: connection reset by peer / i/o timeout).
- when I restarted the telegraf container data started immediately to flow again from the mqtt-server to the InfluxDB
This is not the same issue please file a separate request.
E! [inputs.mqtt_consumer::ttn_consumer_ow] Error in plugin: connection lost: read tcp 172.19.0.3:42600->52.212.223.226:1883: read: connection reset by peer E! [inputs.mqtt_consumer::ttn_consumer_ow] Error in plugin: network Error : read tcp 172.19.0.3:33402->63.34.215.128:1883: i/o timeout
I do not have a timeout issue, as you can see im my log, the reconnection is successful:
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] client is connected/reconnected
...
2024-12-08T22:49:50Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]
This bug is that after the successful reconnection:
- messages are not received / processed
- after sending a ping, the pingresp is not received / processed
I think the bug was triggered after an upgrade from telegraf 1.32.0-1 to 1.32.3-1 (or from 1.31.2-1, 1.32.0-1). I'll try to downgrade to 1.32.0-1 and check if I hit this bug
Is it possibly a bug in github.com/eclipse/paho.mqtt.golang v1.5.0 ? I see in mosquitto logs that sometimes, the connection only lasts for 10 seconds, so long before the connection timeout. I don't understand the implication of https://github.com/eclipse-paho/paho.mqtt.golang/commit/6801721ef78fe9bcbb6dddecbc2968aea5e18204 , could this have anything to do with it?
I can confirm that the bug is:
I suspect it's either the paho.mqtt.golang dependency update or #15486. Less likely #15528, for this would probably not cause the ping resp to be missed.
WORKAROUND: downgrade to v1.31.3 and prevent any upgrades until this is fixed.
Any news about a fix, besides a downgrade? I can still experience this in Telegraf v. 1.33.1.
As a workaround, I tried adding these parameters to my telegraf.conf
persistent_session = true
qos = 1
connection_timeout = "60s"
max_undelivered_messages = 1000
Will keep you posted if that changes anything.
Any news about a fix, besides a downgrade? I can still experience this in Telegraf v. 1.33.1.
As a workaround, I tried adding these parameters to my telegraf.conf
persistent_session = true qos = 1 connection_timeout = "60s" max_undelivered_messages = 1000Will keep you posted if that changes anything.
It didn't for me
@tom-ch1 I've tested with our integration test and if the server is down and up again, the plugin reconnects and is also able to receive messages. In your log, it seems like the ping response does not arrive within the configured interval (10s) and thus a reconnect is initiated with the next gather cycle...
Do you have some reliable way to reproduce the issue?
Hello! I am closing this issue due to inactivity. I hope you were able to resolve your problem, if not please try posting this question in our Community Slack or Community Forums or provide additional details in this issue and reqeust that it be re-opened. Thank you!
I gave you quite a detailed description of the issue, including:
- detailed system info
- configuration files
- steps to reproduce
- log files
- exact version in which bug is introduced
- most likely commits and dependencies which introduced the bug
I'm sorry, I can't give you more than that. Just closing the issue won't resolve it, not for me and not for the many who are probably also experience it.
Did you investigate on the 2 paho.mqtt.golang dependency updates I indicated?
I gave you quite a detailed description of the issue, including:
* detailed system info * configuration files * steps to reproduce * log files * exact version in which bug is introduced * most likely commits and dependencies which introduced the bugI'm sorry, I can't give you more than that. Just closing the issue won't resolve it, not for me and not for the many who are probably also experience it.
Did you investigate on the 2 paho.mqtt.golang dependency updates I indicated?
I just want to support what @tom-ch1 has said.
I am sorry that I am newbie to this and probably not able to provide as much information as he has but I have seen the same behavior in 1.32.0.
Switched now back to 1.31.3 hoping the issue will get resolved!
@tom-ch1 our bot closes issues if nobody responds for 2 weeks after the "waiting for response" label was set. Reopening as you still seem responsive. ;-)
Did you investigate on the 2 paho.mqtt.golang dependency updates I indicated?
My problem is that I cannot reproduce the issue locally and thus it's impossible for me to find out what's going on or if rolling back the dependency solves the issue. That's why I asked if you have an easy way to reproduce it... What I can offer is to roll back the two dependency updates you pointed out and you test if this fixes the issue. We then would need to find out what exactly causes the issue in the paho lib. Would that be ok for you?
I also want to support @tom-ch1's observation.
Although he assumed that my issue is different (maybe I haven't posted enough log messages) and I can't look into the software as deep as he did, I still see the following:
I have two Telegraf instance running (in Docker containers).
Telegraf 1.31.2 : I have never observed any issue, data received via MQTT are just written to InfluxDB all the time.
Telegraf 1.33.3 : frequent stops of data written to InfluxDB, about 1 ... 2 times a week. Which is quite annoying and rules out this version of Telegraf and the MQTT plugin for any serious use case!
What I can offer is to roll back the two dependency updates you pointed out and you test if this fixes the issue. We then would need to find out what exactly causes the issue in the paho lib. Would that be ok for you?
@srebhan : that would be perfect! I can test it.
We're observing apparently the same issue on 1.34.0. What we see in the telegraf logs is repeated Connecting/Connected messages about every 80 seconds or so. But no metrics were written out.
2025-03-28 08:40:30.004 2025-03-28T15:40:30Z I! [inputs.mqtt_consumer] Connected [tcp://mosquitto:1883]
2025-03-28 08:41:50.001 2025-03-28T15:41:50Z D! [inputs.mqtt_consumer] Connecting [tcp://mosquitto:1883]
2025-03-28 08:41:50.005 2025-03-28T15:41:50Z I! [inputs.mqtt_consumer] Connected [tcp://mosquitto:1883]
2025-03-28 08:43:10.004 2025-03-28T15:43:10Z D! [inputs.mqtt_consumer] Connecting [tcp://mosquitto:1883]
2025-03-28 08:43:10.006 2025-03-28T15:43:10Z I! [inputs.mqtt_consumer] Connected [tcp://mosquitto:1883]
2025-03-28 08:44:30.005 2025-03-28T15:44:30Z D! [inputs.mqtt_consumer] Connecting [tcp://mosquitto:1883]
EDIT: Downgrading to v1.31.3 fixes the issue for us as well.
On version 1.34.2, experienced this issue multiple times. Seems to only happen with a small network disconnects or we update/restart mqtt/ect.
I was running into the same issue. However, increasing the maximum_undelivered_messages does seem to fix the issue. I found that, despite my batch size on the Influx-output was larger, the maximum size of each batch was capped at 1000. After a little search I came across that setting in the MQTT-Consumer.
I also downgraded to v1.31.3. There the disconnect also happens, but a lot less frequently than in v1.34.2.
What I can offer is to roll back the two dependency updates you pointed out and you test if this fixes the issue. We then would need to find out what exactly causes the issue in the paho lib. Would that be ok for you?
@srebhan : that would be perfect! I can test it.
Can I somewhere get the rolled-back version so I can test it?
We are having the same problem with collecting data from TTN - I've rolled back to 1.31.2 (from dockerhub) - was telegraf::latest I'll report back if we have any more problems. Errors:
2025-05-01T13:43:58Z E! [inputs.mqtt_consumer] Error in plugin: connection lost: read tcp 172.22.x.x:34452->63.34.215.128:1883: read: connection reset by peer
2025-05-01T13:44:00Z I! [inputs.mqtt_consumer] Connected [mqtt://eu1.cloud.thethings.network:1883]
2025-05-01T14:35:58Z E! [inputs.mqtt_consumer] Error in plugin: connection lost: read tcp 172.22.x.x:35908->52.212.223.226:1883: read: connection reset by peer
2025-05-01T14:36:10Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:36:20Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:36:30Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:36:40Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:36:50Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:37:00Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:37:00Z E! [inputs.mqtt_consumer] Error in plugin: network Error : read tcp 172.22.x.x:37834->52.212.223.226:1883: i/o timeout
2025-05-01T14:37:20Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:37:30Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:37:40Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:37:50Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:38:00Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:38:10Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:38:10Z E! [inputs.mqtt_consumer] Error in plugin: network Error : read tcp 172.22.x.x:49190->52.212.223.226:1883: i/o timeout
2025-05-01T14:38:30Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:38:40Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
2025-05-01T14:38:45Z I! [inputs.mqtt_consumer] Connected [mqtt://eu1.cloud.thethings.network:1883]
2025-05-01T14:55:10Z I! [inputs.mqtt_consumer] Connected [mqtt://eu1.cloud.thethings.network:1883]
2025-05-01T14:56:30Z I! [inputs.mqtt_consumer] Connected [mqtt://eu1.cloud.thethings.network:1883]
2025-05-01T14:57:50Z I! [inputs.mqtt_consumer] Connected [mqtt://eu1.cloud.thethings.network:1883]
<repeats...>
We have been experiencing a similar issue: Telegraf client disconnects from our MQTT broker, it is able to reconnect, but it then doesn't subscribe to the topics it is supposed to (i.e. those defined in its configuration file). At least the WebUI of our MQTT broker doesn't list any topics for the affected Telegraf client.
I can see
2024-12-08T22:49:50Z D! [inputs.mqtt_consumer] [client] sending subscribe message, topics:[topic1/# topic2/#]
in Tom's log, I don't know if 'sending subscribe message...' is the same as 'successfully subscribed to...'
Perhaps the missing topics can help with troubleshooting.
We're experiencing similar issues in our production environment (Ubuntu 24 LTS, Telegraf 1.33.3 running in Docker via docker-compose). This is the second occurrence in two months for us. The first incident happened after maintenance tasks (OS update of the MQTT server), and this most recent one occurred following a network outage by our provider.
In both cases, the pattern was identical: the MQTT connection was successfully re-established, but Telegraf stopped processing messages entirely. We also observed frequent reconnects to the broker approximately every 120 seconds, which failed to restore message processing. The only resolution was manually restarting the Docker container running our Telegraf service, after which reconnects ceased and message processing resumed normally.
We plan to update to version 1.34 first and then implement monitoring of Telegraf service logs to receive alerts when this issue occurs again so we do not lose as much data as this time.
I have attached some log output:
docker-telegraf | 2025-05-26T03:17:01Z E! [inputs.mqtt_consumer] Error in plugin: connection lost: EOF
docker-telegraf | 2025-05-26T03:17:20Z W! [inputs.mqtt_consumer] Collection took longer than expected; not complete after interval of 10s
docker-telegraf | 2025-05-26T03:17:21Z E! [inputs.mqtt_consumer] Error in plugin: network Error : dial tcp ***.***.***.***:1234: connect: connection refused
docker-telegraf | 2025-05-26T03:17:30Z I! [inputs.mqtt_consumer] Connected [ssl://***.***.***.***:1234]
docker-telegraf | 2025-05-26T03:19:00Z I! [inputs.mqtt_consumer] Connected [ssl://***.***.***.***:1234]
docker-telegraf | 2025-05-26T03:20:20Z I! [inputs.mqtt_consumer] Connected [ssl://***.***.***.***:1234]
[... pattern continues every ~120 seconds for over 14 hours ...]
docker-telegraf | 2025-05-26T17:29:30Z I! [inputs.mqtt_consumer] Connected [ssl://***.***.***.***:1234]
docker-telegraf | 2025-05-26T17:30:50Z I! [inputs.mqtt_consumer] Connected [ssl://***.***.***.***:1234]
docker-telegraf | 2025-05-26T17:32:10Z I! [inputs.mqtt_consumer] Connected [ssl://***.***.***.***:1234]
docker-telegraf | 2025-05-26T17:32:35Z I! [agent] Hang on, flushing any cached metrics before shutdown
docker-telegraf | 2025-05-26T17:32:35Z I! [agent] Stopping running outputs
We hope our additional data points help give some credence to this issue.
Hi all. After reading the thread above, I made a test where i run both v1.34.4 and v1.31.3 together in parallel in the same virtual machine but writing to different influxdb instances.
Both are listening to the same sources. My office occasionally has network issues that results in connection disruption. The key point here is that:
v1.34.4 - try to re-establish connection but no longer process data once network error is solved
v1.31.3 - able to re-establish connection and continue processing data once network is back to normal.
Here are some comparisons. Hope this helps with the fixing of this issue. In the meantime we will use v1.31.3 as it is reliable.
v1.34.4 - data dropped
2025-07-01T16:23:11+08:00 D! [inputs.mqtt_consumer::fake_water_loadtest] Connecting [tls://172.16.2.162:8883]
2025-07-01T16:23:11+08:00 I! [inputs.mqtt_consumer::fake_water_loadtest] Connected [tls://172.16.2.162:8883]
2025-07-01T16:23:16+08:00 I! [inputs.mqtt_consumer::air_quality] Connected [ssl://******.emqxsl.com:8883]
2025-07-01T16:23:16+08:00 E! [inputs.mqtt_consumer::fake_electrical_meter] Error in plugin: network Error : dial tcp: lookup *****: read udp 172.16.2.35:58948->8.8.8.8:53: i/o timeout
2025-07-01T16:23:30+08:00 W! [inputs.mqtt_consumer::fake_water_meter] Collection took longer than expected; not complete after interval of 10s
2025-07-01T16:23:30+08:00 E! [inputs.mqtt_consumer::fake_water_meter] Error in plugin: network Error : dial tcp: lookup n2689bbb.ala.eu-central-1.emqxsl.com on 8.8.8.8:53: read udp 172.16.2.35:39642->8.8.8.8:53: i/o timeout
2025-07-01T16:23:41+08:00 E! [inputs.mqtt_consumer::fake_electrical_meter] Error in plugin: network Error : dial tcp: lookup ******: read udp 172.16.2.35:54205->8.8.8.8:53: i/o timeout
2025-07-01T16:23:41+08:00 D! [inputs.mqtt_consumer::fake_water_meter] Previous collection has not completed; scheduled collection skipped
2025-07-01T16:23:46+08:00 E! [inputs.mqtt_consumer::fake_electrical_loadtest] Error in plugin: connection lost: read tcp 172.16.2.35:54820->172.16.2.96:8883: read: connection reset by peer
2025-07-01T16:23:46+08:00 D! [inputs.mqtt_consumer::fake_electrical_loadtest] Disconnected [tls://172.16.2.96:8883]
2025-07-01T16:23:50+08:00 D! [inputs.mqtt_consumer::fake_water_meter] Previous collection has not completed; scheduled collection skipped
2025-07-01T16:23:51+08:00 W! [inputs.mqtt_consumer::fake_water_meter] Collection took longer than expected; not complete after interval of 10s
2025-07-01T16:23:51+08:00 E! [inputs.mqtt_consumer::fake_water_meter] Error in plugin: network Error : dial tcp: lookup ********.emqxsl.com on 8.8.8.8:53: read udp 172.16.2.35:39531->8.8.8.8:53: i/o timeout
2025-07-01T16:23:51+08:00 D! [inputs.mqtt_consumer::fake_electrical_meter] Connecting [tls://***********.emqxsl.com:8883]
q
2025-07-01T16:23:51+08:00 D! [inputs.mqtt_consumer::fake_electrical_loadtest] Connecting [tls://172.16.2.96:8883]
2025-07-01T16:23:51+08:00 I! [inputs.mqtt_consumer::fake_electrical_loadtest] Connected [tls://172.16.2.96:8883]
Internal metrics - observe the rise of gather errors
Transformed Data Loss
v1.31.3 - able to recover by itself once network is back to normal
2025-07-01T16:39:30+08:00 D! [outputs.file::fake_electrical_meter] Wrote batch of 2 metrics in 84.991µs
2025-07-01T16:39:30+08:00 D! [outputs.file::fake_electrical_meter] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:31+08:00 W! [inputs.mqtt_consumer::fake_electrical_loadtest] Collection took longer than expected; not complete after interval of 10s
2025-07-01T16:39:31+08:00 D! [inputs.mqtt_consumer::fake_water_loadtest] Connecting [tls://172.16.2.162:8883]
2025-07-01T16:39:31+08:00 I! [inputs.mqtt_consumer::fake_water_loadtest] Connected [tls://172.16.2.162:8883]
2025-07-01T16:39:31+08:00 D! [inputs.mqtt_consumer::fake_electrical_loadtest] Previous collection has not completed; scheduled collection skipped
2025-07-01T16:39:33+08:00 D! [outputs.file::air_quality] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:35+08:00 D! [outputs.file::others] Wrote batch of 66 metrics in 162.618µs
2025-07-01T16:39:35+08:00 D! [outputs.file::others] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:38+08:00 D! [outputs.influxdb_v2::influxdb_v3] Wrote batch of 71 metrics in 5.184757ms
2025-07-01T16:39:38+08:00 D! [outputs.influxdb_v2::influxdb_v3] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:40+08:00 D! [inputs.mqtt_consumer::fake_electrical_loadtest] Previous collection has not completed; scheduled collection skipped
2025-07-01T16:39:41+08:00 W! [inputs.mqtt_consumer::fake_electrical_loadtest] Collection took longer than expected; not complete after interval of 10s
2025-07-01T16:39:41+08:00 D! [outputs.file::fake_water_meter] Wrote batch of 2 metrics in 54.581µs
2025-07-01T16:39:41+08:00 D! [outputs.file::fake_water_meter] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:50+08:00 D! [inputs.mqtt_consumer::fake_electrical_loadtest] Previous collection has not completed; scheduled collection skipped
2025-07-01T16:39:51+08:00 D! [outputs.health] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:51+08:00 W! [inputs.mqtt_consumer::fake_electrical_loadtest] Collection took longer than expected; not complete after interval of 10s
2025-07-01T16:39:51+08:00 E! [inputs.mqtt_consumer::fake_electrical_loadtest] Error in plugin: network Error : dial tcp 172.16.2.96:8883: i/o timeout
2025-07-01T16:39:51+08:00 D! [outputs.file::fake_electrical_meter] Wrote batch of 2 metrics in 67.89µs
2025-07-01T16:39:51+08:00 D! [outputs.file::fake_electrical_meter] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:51+08:00 D! [outputs.file::people_counter] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:55+08:00 D! [outputs.file::air_quality] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:56+08:00 D! [outputs.file::others] Wrote batch of 66 metrics in 159.791µs
2025-07-01T16:39:56+08:00 D! [outputs.file::others] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:39:58+08:00 D! [outputs.influxdb_v2::influxdb_v3] Wrote batch of 70 metrics in 3.757782ms
2025-07-01T16:39:58+08:00 D! [outputs.influxdb_v2::influxdb_v3] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:40:00+08:00 D! [inputs.mqtt_consumer::fake_electrical_loadtest] Connecting [tls://172.16.2.96:8883]
2025-07-01T16:40:03+08:00 D! [outputs.file::fake_water_meter] Wrote batch of 2 metrics in 59.608µs
2025-07-01T16:40:03+08:00 D! [outputs.file::fake_water_meter] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:40:07+08:00 I! [inputs.mqtt_consumer::fake_electrical_loadtest] Connected [tls://172.16.2.96:8883]
2025-07-01T16:40:12+08:00 D! [outputs.health] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:40:13+08:00 D! [outputs.file::people_counter] Wrote batch of 1 metrics in 74.318µs
2025-07-01T16:40:13+08:00 D! [outputs.file::people_counter] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:40:13+08:00 D! [outputs.file::fake_electrical_meter] Wrote batch of 3 metrics in 50.65µs
2025-07-01T16:40:13+08:00 D! [outputs.file::fake_electrical_meter] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:40:16+08:00 D! [outputs.file::air_quality] Wrote batch of 1 metrics in 116.011µs
2025-07-01T16:40:16+08:00 D! [outputs.file::air_quality] Buffer fullness: 0 / 100000 metrics
2025-07-01T16:40:17+08:00 D! [outputs.file::others] Wrote batch of 66 metrics in 173.03µs
2025-07-01T16:40:17+08:00 D! [outputs.file::others] Buffer fullness: 0 / 100000 metrics
Internal Gather Error - There are still Gather Errors, but no long drops in data
Despite going through the same network error - no obvious drop in data
My suspicion is that the underlying library did change something. Let me try to downgrade the MQTT library and provide a PR for testing...
Having the same problem using telegraf 1.34.4
telegraf[1112842]: 2025-07-10T07:02:41Z E! [inputs.mqtt_consumer] Error in plugin: connection lost: EOF
telegraf[1112842]: 2025-07-10T07:02:57Z E! [inputs.mqtt_consumer] Error in plugin: network Error : dial tcp 000.000.000.000:1883: connect: connection refused
telegraf[1112842]: 2025-07-10T07:03:00Z E! [inputs.mqtt_consumer] Error in plugin: network Error : dial tcp 000.000.000.000:1883: connect: connection refused
telegraf[1112842]: 2025-07-10T07:03:10Z I! [inputs.mqtt_consumer] Connected [tcp://mqtt.example.com:1883]
telegraf[1112842]: 2025-07-10T07:29:50Z I! [inputs.mqtt_consumer] Connected [tcp://mqtt.example.com:1883]
telegraf[1112842]: 2025-07-10T07:31:10Z I! [inputs.mqtt_consumer] Connected [tcp://mqtt.example.com:1883]
telegraf[1112842]: 2025-07-10T07:32:30Z I! [inputs.mqtt_consumer] Connected [tcp://mqtt.example.com:1883]
telegraf[1112842]: 2025-07-10T07:33:50Z I! [inputs.mqtt_consumer] Connected [tcp://mqtt.example.com:1883]
Happy to help testing or provide more information if necessary.
I can also observe the same issue in 1.34.4 and am also happy to assist in solving this issue.
@tom-ch1 I've tested with our integration test and if the server is down and up again, the plugin reconnects and is also able to receive messages. In your log, it seems like the ping response does not arrive within the configured interval (10s) and thus a reconnect is initiated with the next gather cycle...
Do you have some reliable way to reproduce the issue?
I also see this. In terms of how to reproduce, the easiest method I have found is to run telegraf in a VM and disconnect the uplink from the underlying host.
I had that issue as well on 1.36.1. As suggested I added
persistent_session = true
qos = 1
connection_timeout = "60s"
max_undelivered_messages = 1000
but also
client_id = "some_string"
Data has been flowing for a day now, across multiple broker re-connections.
Data has been flowing for a day now, across multiple broker re-connections.
Is it still working for you?
Data has been flowing for a day now, across multiple broker re-connections.
Is it still working for you?
The configuration stopped working when the publisher stopped publishing. I set persistent_session = false 4 days ago and it has been working since.
I'm also experiencing this problem. After several days (most recently 3 and then 6), the mqtt_consumer plugin gets stuck in a state where it no longer outputs any metrics. Like the logs upthread, it successfully reconnects to the MQTT server. I ran Wireshark and confirmed that the plugin is successfully re-subscribing to the correct topics, and the plugin is receiving newly published messages. However, they never end up at the output. After 80 or 90s (almost exactly a multiple of 10s), the client simply closes the (working) TCP connection, and starts reconnecting again.
I actually wonder if this is a bug in the plugin's use of tracking metrics - maybe it's leaking tokens and eventually it doesn't have any tokens left to process any incoming messages. If it helps to diagnose this, both the generic metrics_gathered metric and the mqtt_consumer messages_received metric stop incrementing when the plugin is in its wedged state (even though Wireshark shows that it is still receiving MQTT messages).
Same exact issue encountered in v1.36.3. Will try downgrading to v1.31.3 to see if that helps.
Edit: it works fine with after downgrading.