logstash-codec-avro
logstash-codec-avro copied to clipboard
undefined method `type_sym' for nil:NilClass
I listen to kafka messages. Kafka messages are in avro format, and errors are reported when running.
avro-logstash_1 | [2019-12-13T15:38:13,239][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-2, groupId=kafka_avro_dev] Setting newly assigned partitions:
avro-logstash_1 | [2019-12-13T15:38:13,239][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=kafka_avro_dev-3, groupId=kafka_avro_dev] Successfully joined group with generation 46
avro-logstash_1 | [2019-12-13T15:38:13,240][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-3, groupId=kafka_avro_dev] Setting newly assigned partitions:
avro-logstash_1 | [2019-12-13T15:38:13,249][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Setting newly assigned partitions: teacher_roll_call-0
avro-logstash_1 | [2019-12-13T15:38:13,265][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Setting offset for partition teacher_roll_call-0 to the committed offset FetchPosition{offset=102568, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=10.1.64.243:9093 (id: 1 rack: null), epoch=0}}
avro-logstash_1 | [2019-12-13T15:38:34,575][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Member kafka_avro_dev-0-8c8460a0-7e81-48e5-94db-6582900b1bcf sending LeaveGroup request to coordinator 10.1.64.243:9093 (id: 2147483646 rack: null)
avro-logstash_1 | warning: thread "Ruby-0-Thread-15: :1" terminated with exception (report_on_exception is true):
avro-logstash_1 | NoMethodError: undefined method `type_sym' for nil:NilClass
avro-logstash_1 | match_schemas at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36
avro-logstash_1 | match_schemas at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240
avro-logstash_1 | read_data at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257
avro-logstash_1 | read_union at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355
avro-logstash_1 | read_data at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286
avro-logstash_1 | read at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252
avro-logstash_1 | decode at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77
avro-logstash_1 | thread_runner at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258
avro-logstash_1 | thread_runner at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257
avro-logstash_1 | [2019-12-13T15:38:34,599][ERROR][logstash.javapipeline ] A plugin had an unrecoverable error. Will restart this plugin.
avro-logstash_1 | Pipeline_id:main
avro-logstash_1 | Plugin: <LogStash::Inputs::Kafka codec=><LogStash::Codecs::Avro schema_uri=>"/rds-dts-record.avsc", id=>"40a4c414-d851-475f-b435-20dc6aca7f94", enable_metric=>true, tag_on_failure=>false>, auto_offset_reset=>"latest", group_id=>"kafka_avro_dev", topics=>["teacher_roll_call"], consumer_threads=>4, id=>"ed822f74235bdf59b1ee471d9053997e1e7ab1143de365181cfd66cf250243d3", type=>"kafka_avro_dev", bootstrap_servers=>"10.1.64.243:9093", client_id=>"kafka_avro_dev", decorate_events=>true, enable_metric=>true, auto_commit_interval_ms=>"5000", enable_auto_commit=>"true", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI">
avro-logstash_1 | Error: undefined method `type_sym' for nil:NilClass
avro-logstash_1 | Exception: NoMethodError
avro-logstash_1 | Stack: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36:in `match_schemas'
avro-logstash_1 | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240:in `match_schemas'
avro-logstash_1 | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257:in `read_data'
avro-logstash_1 | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355:in `read_union'
avro-logstash_1 | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286:in `read_data'
avro-logstash_1 | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252:in `read'
avro-logstash_1 | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'
avro-logstash_1 | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258:in `block in thread_runner'
avro-logstash_1 | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257:in `block in thread_runner'
avro-logstash_1 | [2019-12-13T15:38:34,599][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `type_sym' for nil:NilClass>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36:in `match_schemas'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240:in `match_schemas'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257:in `read_data'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355:in `read_union'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286:in `read_data'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252:in `read'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258:in `block in thread_runner'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257:in `block in thread_runner'"]}
avro-logstash_1 | [2019-12-13T15:38:34,656][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
My avsc file is from here:https://github.com/LioRoger/subscribe_example/blob/master/avro/Record.avsc
My logstash configuration file is as follows:
input {
kafka {
bootstrap_servers => "10.1.64.243:9093"
consumer_threads => 4
auto_offset_reset => "latest"
#key_deserializer_class => "org.apache.kafka.common.serialization.ByteArraySerializer"
#value_deserializer_class => "org.apache.kafka.common.serialization.ByteArraySerializer"
codec => avro {
schema_uri => "/rds-dts-record.avsc"
}
topics => ["teacher_roll_call"]
type => "kafka_avro_dev"
decorate_events => "true"
client_id => "kafka_avro_dev"
group_id => "kafka_avro_dev"
}
}
#filter {
# json {
# source => "message"
# }
# mutate {
# add_field => {
# "kafka" => "%{[@metadata][kafka]}"
# }
# }
#}
output {
if [type] == "kafka_avro_dev" {
logservice {
codec => "json"
endpoint => "cn-xxxxx-xxxxxx.log.aliyuncs.com"
project => "xmkafka"
logstore => "dev-dts"
topic => "ALL"
source => ""
access_key_id => "xxxxxxx"
access_key_secret => "xxxxxxxx"
max_send_retry => 10
}
}
}
I experienced the same in combination with Confluents Schema Registry and messages that were encoded with Confluents Avro Wire Frame format. Using https://github.com/revpoint/logstash-codec-avro_schema_registry instead of the default plugin solved this problem for me.
I'm using the AWS Glue schema registry, so my Kafka records are serialized with com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer
. Trying to process those records with this Logstash Avro codec fails with the same NoMethodError: undefined method type_sym for nil:NilClass
error mentioned in this issue.
From what I understand, both the Confluent schema registry serializer and the AWS Glue schema registry serializer don't just output the raw, standard Avro bytes, but they add some additional vendor-specific bytes as well:
- Confluent: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
- Amazon Glue: https://github.com/awslabs/aws-glue-schema-registry/blob/master/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java#L54
These additional bytes cause this Logstash Avro codec to fail to parse the data.
I guess adding Confluent and AWS Glue specific configurations to this Avro-specific plugin would add bloat. The Elastic Kafka input plugin already has some Confluent schema registry support (https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-kafka.html#plugins-inputs-kafka-schema_registry_url), but I don't see anything for Glue.