logstash-codec-avro
logstash-codec-avro copied to clipboard
exception handling when event doesn't adhere to the AVRO schema
Please post all product and debugging questions on our forum. Your questions will reach our wider community members there, and if we confirm that there is a bug, then we can open a new issue here.
For all general issues, please provide the following details for fast resolution:
- Version: 3.2.3 avro codec/ logstash 6.5.3
- Operating System: Linux
- Config File (if you have sensitive info, please remove it):
output {
kafka {
codec => avro_schema_registry {
endpoint => "http://hostname:8081/"
subject_name => "Logstash.Logs-value"
schema_version => 2
register_schema => false
}
client_id => "dev.elk"
topic_id => "Logstash.Logs"
bootstrap_servers => "bootstrap01:9092, bootstrap02:9092, bootstrap03:9092"
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
- Sample Data:
- Steps to Reproduce: When using Logstash Kafka Output plugin with AVRO encoding enabled. If an event comes in which doesn't adhere to the AVRO schema, it is rejected with an error message in the logstash logs:
[2019-03-20T23:56:03,993][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<Avro::IO::AvroTypeError: The datum {"type"=>"test", "message_size"=>"100", "input_type"=>"log", "message"=>"Bar was successful - 00000030 WSRdbManagedC W DSRA9510W: This is another line of extra data so that it can be put near 1000 characters - When she was discontented, she fancied herself nervous. The business of her life was to get her daughters married; its solace was visiting and news. connection...}", "offset"=>13552881, "source"=>"/apps/load-simulator/logs/SystemOut.log", "tags"=>["beats_input_codec_plain_applied"], "host"=>"devElk", "version"=>"1", "timestamp"=>"2019-03-08T18:08:02.117Z", "beat"=>{"version"=>"5.5.2", "name"=>"devElk", "hostname"=>"devElk"}, "fields"=>{"docType"=>"test", "cioName"=>"channels", "appCatCode"=>"121"}} is not an example of schema {"type":"record","name":"LogstashEntry","namespace":"com.bmo.logstash","fields":[{"name":"offset","type":"long"},{"name":"input_type","type":"string"},{"name":"source","type":"string"},{"name":"thread","type":["string","null"]},{"name":"epochMillis","type":["long","null"]},{"name":"message","type":"string"},{"name":"type","type":"string"},{"name":"message_size","type":"int"},{"name":"tags","type":{"type":"array","items":"string"}},{"name":"logstash","type":["null",{"type":"record","name":"Logstash","namespace":"com.bmo.logstash","fields":[{"name":"hostName","type":"string"},{"name":"location","type":"string"}]}]},{"name":"timestamp","type":"string"},{"name":"logLevel","type":["string","null"]},{"name":"version","type":"string"},{"name":"beat","type":{"type":"record","name":"Beat","namespace":"com.bmo.logstash","fields":[{"name":"name","type":"string"},{"name":"hostname","type":"string"},{"name":"version","type":"string"}]}},{"name":"host","type":"string"},{"name":"additionalInfo","type":["string","null"]},{"name":"inputSource","type":["string","null"]},{"name":"esTimestamp","type":["string","null"]},{"name":"fields","type":{"type":"record","name":"Fields","namespace":"com.bmo.logstash","fields":[{"name":"appCatCode","type":["string","long"]},{"name":"subType","type":["null","string"],"default":null},{"name":"cioName","type":"string"},{"name":"routeNifi","type":["null","string"],"default":null},{"name":"version","type":["null","string"],"default":null},{"name":"docType","type":"string"}]}},{"name":"elapsedTime","type":["string","null"]}]}>, :backtrace=>["/apps/elkapps/production/scc/logstash-6.5.3/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:547:in `write_data'", "/apps/elkapps/production/scc/logstash-6.5.3/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:542:in `write'", "/apps/elkapps/production/scc/logstash-6.5.3/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:252:in `encode'", "/apps/elkapps/production/scc/logstash-6.5.3/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.2.1/lib/logstash/outputs/kafka.rb:221:in `block in multi_receive'", "org/jruby/RubyArray.java:1734:in `each'", "/apps/elkapps/production/scc/logstash-6.5.3/vendor/bundle/jruby/2.3.0/gems/logstash-output-kafka-7.2.1/lib/logstash/outputs/kafka.rb:219:in `multi_receive'", "org/logstash/config/ir/compiler/OutputStrategyExt.java:114:in `multi_receive'", "org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:97:in `multi_receive'", "/apps/elkapps/production/scc/logstash-6.5.3/logstash-core/lib/logstash/pipeline.rb:373:in `block in output_batch'", "org/jruby/RubyHash.java:1343:in `each'", "/apps/elkapps/production/scc/logstash-6.5.3/logstash-core/lib/logstash/pipeline.rb:372:in `output_batch'", "/apps/elkapps/production/scc/logstash-6.5.3/logstash-core/lib/logstash/pipeline.rb:324:in `worker_loop'", "/apps/elkapps/production/scc/logstash-6.5.3/logstash-core/lib/logstash/pipeline.rb:286:in `block in start_workers'"]}
[2019-03-20T23:56:04,045][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
logstash is hung and not processing anything after that with java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
, and logstash has to be restarted.
The problem arise in the avro codec and looking at the encode method, there is no exception handling which is triggered by the write method call; https://github.com/logstash-plugins/logstash-codec-avro/blob/802e5f987851a3cfc55213daf750ea4e408d1b0e/lib/logstash/codecs/avro.rb#L92 Could exception handling be added in this codec so logstash could circumvent this issue and continue to ingest data and ignore incorrectly formatted data ?
Please consider this request: possible solution is to catch that it s a format exception, with this indication we are able to write in a dead letter queue topic for example ...