logstash-codec-avro icon indicating copy to clipboard operation
logstash-codec-avro copied to clipboard

undefined method `type_sym' for nil:NilClass

Open fonzie1006 opened this issue 5 years ago • 2 comments

I listen to kafka messages. Kafka messages are in avro format, and errors are reported when running.

avro-logstash_1  | [2019-12-13T15:38:13,239][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-2, groupId=kafka_avro_dev] Setting newly assigned partitions:
avro-logstash_1  | [2019-12-13T15:38:13,239][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=kafka_avro_dev-3, groupId=kafka_avro_dev] Successfully joined group with generation 46
avro-logstash_1  | [2019-12-13T15:38:13,240][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-3, groupId=kafka_avro_dev] Setting newly assigned partitions:
avro-logstash_1  | [2019-12-13T15:38:13,249][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Setting newly assigned partitions: teacher_roll_call-0
avro-logstash_1  | [2019-12-13T15:38:13,265][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Setting offset for partition teacher_roll_call-0 to the committed offset FetchPosition{offset=102568, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=10.1.64.243:9093 (id: 1 rack: null), epoch=0}}
avro-logstash_1  | [2019-12-13T15:38:34,575][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=kafka_avro_dev-0, groupId=kafka_avro_dev] Member kafka_avro_dev-0-8c8460a0-7e81-48e5-94db-6582900b1bcf sending LeaveGroup request to coordinator 10.1.64.243:9093 (id: 2147483646 rack: null)
avro-logstash_1  | warning: thread "Ruby-0-Thread-15: :1" terminated with exception (report_on_exception is true):
avro-logstash_1  | NoMethodError: undefined method `type_sym' for nil:NilClass
avro-logstash_1  |   match_schemas at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36
avro-logstash_1  |   match_schemas at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240
avro-logstash_1  |       read_data at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257
avro-logstash_1  |      read_union at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355
avro-logstash_1  |       read_data at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286
avro-logstash_1  |            read at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252
avro-logstash_1  |          decode at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77
avro-logstash_1  |   thread_runner at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258
avro-logstash_1  |   thread_runner at /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257
avro-logstash_1  | [2019-12-13T15:38:34,599][ERROR][logstash.javapipeline    ] A plugin had an unrecoverable error. Will restart this plugin.
avro-logstash_1  |   Pipeline_id:main
avro-logstash_1  |   Plugin: <LogStash::Inputs::Kafka codec=><LogStash::Codecs::Avro schema_uri=>"/rds-dts-record.avsc", id=>"40a4c414-d851-475f-b435-20dc6aca7f94", enable_metric=>true, tag_on_failure=>false>, auto_offset_reset=>"latest", group_id=>"kafka_avro_dev", topics=>["teacher_roll_call"], consumer_threads=>4, id=>"ed822f74235bdf59b1ee471d9053997e1e7ab1143de365181cfd66cf250243d3", type=>"kafka_avro_dev", bootstrap_servers=>"10.1.64.243:9093", client_id=>"kafka_avro_dev", decorate_events=>true, enable_metric=>true, auto_commit_interval_ms=>"5000", enable_auto_commit=>"true", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI">
avro-logstash_1  |   Error: undefined method `type_sym' for nil:NilClass
avro-logstash_1  |   Exception: NoMethodError
avro-logstash_1  |   Stack: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36:in `match_schemas'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240:in `match_schemas'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257:in `read_data'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355:in `read_union'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286:in `read_data'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252:in `read'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258:in `block in thread_runner'
avro-logstash_1  | /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257:in `block in thread_runner'
avro-logstash_1  | [2019-12-13T15:38:34,599][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `type_sym' for nil:NilClass>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/schema_compatibility.rb:36:in `match_schemas'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:240:in `match_schemas'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:257:in `read_data'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:355:in `read_union'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:286:in `read_data'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/avro-1.9.1/lib/avro/io.rb:252:in `read'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:258:in `block in thread_runner'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-kafka-10.0.0-java/lib/logstash/inputs/kafka.rb:257:in `block in thread_runner'"]}
avro-logstash_1  | [2019-12-13T15:38:34,656][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

My avsc file is from here:https://github.com/LioRoger/subscribe_example/blob/master/avro/Record.avsc

My logstash configuration file is as follows:

input {
	kafka {
		bootstrap_servers => "10.1.64.243:9093"
			consumer_threads => 4
			auto_offset_reset => "latest"
    #key_deserializer_class => "org.apache.kafka.common.serialization.ByteArraySerializer"
    #value_deserializer_class => "org.apache.kafka.common.serialization.ByteArraySerializer"
			codec => avro {
                             schema_uri => "/rds-dts-record.avsc"
                        }
		topics => ["teacher_roll_call"]
                        type => "kafka_avro_dev"
			decorate_events => "true"
			client_id => "kafka_avro_dev"
			group_id => "kafka_avro_dev"

	}
}

#filter {
#        json {
#                source => "message"
#        }
#        mutate {
#            add_field => {
#                "kafka" => "%{[@metadata][kafka]}"
#            }
#        }
#}

output {
  if [type] == "kafka_avro_dev" {
    logservice {
      codec => "json"
      endpoint => "cn-xxxxx-xxxxxx.log.aliyuncs.com"
      project => "xmkafka"
      logstore => "dev-dts"
      topic => "ALL"
      source => ""
      access_key_id => "xxxxxxx"
      access_key_secret => "xxxxxxxx"
      max_send_retry => 10
    }
  }
}

fonzie1006 avatar Dec 13 '19 08:12 fonzie1006

I experienced the same in combination with Confluents Schema Registry and messages that were encoded with Confluents Avro Wire Frame format. Using https://github.com/revpoint/logstash-codec-avro_schema_registry instead of the default plugin solved this problem for me.

netsplit avatar Dec 09 '20 10:12 netsplit

I'm using the AWS Glue schema registry, so my Kafka records are serialized with com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer. Trying to process those records with this Logstash Avro codec fails with the same NoMethodError: undefined method type_sym for nil:NilClass error mentioned in this issue.

From what I understand, both the Confluent schema registry serializer and the AWS Glue schema registry serializer don't just output the raw, standard Avro bytes, but they add some additional vendor-specific bytes as well:

  • Confluent: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
  • Amazon Glue: https://github.com/awslabs/aws-glue-schema-registry/blob/master/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java#L54

These additional bytes cause this Logstash Avro codec to fail to parse the data.

I guess adding Confluent and AWS Glue specific configurations to this Avro-specific plugin would add bloat. The Elastic Kafka input plugin already has some Confluent schema registry support (https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-kafka.html#plugins-inputs-kafka-schema_registry_url), but I don't see anything for Glue.

bencody avatar Jan 27 '22 12:01 bencody