logstash-input-mqtt copied to clipboard
Error when too many messages arrives
I'm doing a lot of testes using MQTT (Mosquitto) and this plugin with logstash. Almost everything works fine. I've found one scenario where a strange behavior is found.
If I submit 10,000 messages (with logstash down), when it comes up, all the messages seems to be processed but after the last one I get this error which kills my logstash process:
[2018-05-02T17:43:31,164][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `error' for nil:NilClass>, :backtrace=>["/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/subscriber.rb:69:in `add_subscription'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/handler.rb:126:in `handle_suback'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/handler.rb:57:in `handle_packet'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/handler.rb:47:in `receive_packet'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/client.rb:163:in `block in loop_read'", "org/jruby/RubyFixnum.java:299:in `times'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/client.rb:161:in `loop_read'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/client.rb:175:in `mqtt_loop'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/client.rb:128:in `block in daemon_mode'"]}
[2018-05-02T17:43:31,415][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: org.jruby.exceptions.RaiseException: (SystemExit) exit
I'm running logstash using -f parameter.
Follow my configuration sample:
input {
mqtt {
topic => "/teste"
host => "localhost"
username => "aaa"
password => "bbb"
client_id => "teste"
clean_session => false
qos => 1
output {
stdout { codec => rubydebug }
Any thoughts?
Thanks in advance!
Hi @casmeiron,
I have couple of questions:
- How did you submit(and store) messages on Mosquitto ? I suppose that you used QoS, what level ?
- Could you please share mosquitto config and scripts for sending messages ?
- What is reproducibility rate of this issue ?
- Are you able to perform the same test with underlying MQTT client https://github.com/RubyDevInc/paho.mqtt.ruby/ ?
Hi @jurek7,
Sure I can answer these questions, thanks for writing me back.
How did you submit(and store) messages on Mosquitto ? I suppose that you used QoS, what level ?
I'm using eclipse-mosquitto docker (latest) image. The messages are stored using a simple java program (paho client).
void publish( ) throws Exception {
MqttClient mqttClient = new MqttClient( "tcp://localhost:1883", "java-teste" );
mqttClient.setCallback( this );
MqttConnectOptions options = new MqttConnectOptions( );
options.setAutomaticReconnect( true );
options.setCleanSession( false );
options.setUserName( "aaa" );
options.setPassword( "bbb".toCharArray( ) );
mqttClient.connect( options );
MqttMessage msg;
for ( int i = 0; i < 10000; i++ ) {
msg = new MqttMessage( ( "{\"data\": " + i + "}" ).getBytes( ) );
msg.setQos( 1 );
mqttClient.publish( "/teste", msg );
mqttClient.disconnect( );
mqttClient.close( false );
As you can see in the code, I'm using QOS=1.
Could you please share mosquitto config and scripts for uploading messages ?
Sure, follow attached my mosquitto configuration file. (I had to rename it to .txt so github could allow the attachment). mosquitto.txt
What is reproducibility rate of this issue ?
100%, I just need to send some 10,000 messages (with my consumer [logstash] turned OFF) and then start it.
Are you able to perform the same test with underlying MQTT client https://github.com/RubyDevInc/paho.mqtt.ruby/ ?
I'm not a ruby programmer but I could try that.
It looks there is an error in referencing global logger inside PahoMqtt library :
paho-mqtt-1.0.7/lib/paho_mqtt/subscriber.rbline 69: @logger.error("The packet id is invalid, already used.") if PahoMqtt.logger? but should be PahoMqtt.logger.error(...
that's for the message: "undefined method `error' ". Since last commit has enabled the logger that started to occur. So for now we have to disable logger: lib/logstash/inputs/mqtt.rb line:47 PahoMqtt.logger = @logfile comment it.
I've updated plugin to disable logger by default. Please, verify how your case is behaving now. Possibly there will be still an issue but logged differently - that would be an issue inside https://github.com/RubyDevInc/paho.mqtt.ruby/ library. So if that's still not working you could report problems there.
Perfect, I will give it a shot today and let you guys know what happened!
Best Regards.
Just did the same test. It failed (like expected) with the follow error:
[2018-05-08T10:46:51,267][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: org.jruby.exceptions.RaiseException: (PacketException) PahoMqtt::PacketException
Should I open an issue on paho.mqtt.ruby library?
@casmeiron yes, please report also to paho.mqtt.ruby. Please also add details from @tomc78 comment: https://github.com/jurek7/logstash-input-mqtt/issues/5#issuecomment-387022738
Since it seems that he found root cause of this issue. Thanks.
Did that, thanks!
Hi @jurek7, @tomc78 They have updated the paho mqtt jruby library to version 1.0.10 that should fix the problem, could you guys release a new version of your logstash-plugin that uses the mentioned one?
Thanks in advance!
I'd prefer to wait a bit. See: https://github.com/RubyDevInc/paho.mqtt.ruby/issues/38
Hi @jurek7 ,
You're right, they've created another problem with the implemented workaround for our issue.
Let's wait.
Hi @casmeiron,
Could you please redo your tests on latest version(1.0.5) ? paho.mqtt library was updated to 1.0.12
Hey @jurek7 ,
I repeated the test but it wasn't successful, instead I've got a bunch of errors:
E, [2018-06-05T16:03:31.977759 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.981117 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.984160 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.987298 #2077] ERROR -- : Writing queue is full, slowing down
[2018-06-05T16:03:31,986][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x54af63f5@/Users/casmeiron/Developer/tools/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:247 run>"}
E, [2018-06-05T16:03:31.990343 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.993383 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.996457 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.999872 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.003279 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.005930 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.010806 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.013682 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.016814 #2077] ERROR -- : Writing queue is full, slowing down
First time I see this error. A total of 10,000 messages were sent and the error start appearing after the message number 991.
Btw I waited like 10 minutes and the error won't go away.
Could you please attach logstash config and logstash.yml ?
wt., 5.06.2018, 21:06 użytkownik Paulo Reis [email protected] napisał:
Btw I waited like 10 minutes and the error won't go away.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/jurek7/logstash-input-mqtt/issues/5#issuecomment-394825058, or mute the thread https://github.com/notifications/unsubscribe-auth/AAjy5vH2AwKP7iaO9IwoK4rPHTuKoDMAks5t5ta-gaJpZM4TwIe2 .
Hi @jurek7 ,
Sure, follow my configuration:
input {
mqtt {
topic => "/test"
host => "localhost"
username => "aaa"
password => "bbb"
client_id => "test"
clean_session => false
qos => 1
output {
stdout { codec => rubydebug }
The logstash.yml is untouched:
# Settings file in YAML
# Settings can be specified either in hierarchical form, e.g.:
# pipeline:
# batch:
# size: 125
# delay: 5
# Or as flat keys:
# pipeline.batch.size: 125
# pipeline.batch.delay: 5
# ------------ Node identity ------------
# Use a descriptive name for the node:
# node.name: test
# If omitted the node name will default to the machine's host name
# ------------ Data path ------------------
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
# path.data:
# ------------ Pipeline Settings --------------
# The ID of the pipeline.
# pipeline.id: main
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
# This defaults to the number of the host's CPU cores.
# pipeline.workers: 2
# How many events to retrieve from inputs before sending to filters+workers
# pipeline.batch.size: 125
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
# pipeline.batch.delay: 50
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
# WARNING: enabling this can lead to data loss during shutdown
# pipeline.unsafe_shutdown: false
# ------------ Pipeline Configuration Settings --------------
# Where to fetch the pipeline configuration for the main pipeline
# path.config:
# Pipeline configuration string for the main pipeline
# config.string:
# At startup, test if the configuration is valid and exit (dry run)
# config.test_and_exit: false
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
config.reload.automatic: true
# How often to check if the pipeline configuration has changed (in seconds)
config.reload.interval: 3s
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
# config.debug: false
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
# config.support_escapes: false
# ------------ Module Settings ---------------
# Define modules here. Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
# modules:
# - name: MODULE_NAME
# Module variable names must be in the format of
# modules:
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
# ------------ Queuing Settings --------------
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
# queue.type: memory
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
# path.queue:
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
# queue.page_capacity: 64mb
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
# queue.max_events: 0
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
# queue.max_bytes: 1024mb
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
# queue.checkpoint.acks: 1024
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
# queue.checkpoint.writes: 1024
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
# queue.checkpoint.interval: 1000
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
# dead_letter_queue.enable: false
# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
# path.dead_letter_queue:
# ------------ Metrics Settings --------------
# Bind address for the metrics REST endpoint
# http.host: ""
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
# http.port: 9600-9700
# ------------ Debugging Settings --------------
# Options for log.level:
# * fatal
# * error
# * warn
# * info (default)
# * debug
# * trace
# log.level: info
# path.logs:
# ------------ Other Settings --------------
# Where to find custom plugins
# path.plugins: []
Hi @casmeiron,
I reproduced your issue.
@casmeiron From that the log, I could guess that the logstash pipeline is ending while mqtt handling operation are still running. I am currently checking how I could improve that in paho.
I think that terminating pipeline is some kind of side effect. The question on these log:
E, [2018-06-05T16:03:31.977759 #2077] ERROR -- : Writing queue is full, slowing down E, [2018-06-05T16:03:31.981117 #2077] ERROR -- : Writing queue is full, slowing down
It prints this message with a very short interval between.
This message is print because the internal writing buffer is full. I think that it due to a acknowledgment packet. I think it should be a warning more than an error in logger level. I am thinking on this issue.
I see but the problem is that after this happen MQTT messages are not consumed anymore.
Hi @jurek7 ,
They have updated the paho.mqtt ruby implementation to version 1.0.12. Could you help me to test with this release? Let me know the steps I must perform to try it.
The same on 1.0.12