logisland
logisland copied to clipboard
IllegalArgumentException: No enum constant com.hurence.logisland.record.FieldType.UNION
Expected behavior and actual behavior.
When deserializing an Avro record from Kafka topic, I got the following Java exception: java.lang.IllegalArgumentException: No enum constant com.hurence.logisland.record.FieldType.UNION at java.lang.Enum.valueOf(Unknown Source) at com.hurence.logisland.record.FieldType.valueOf(FieldType.java:23) at com.hurence.logisland.serializer.AvroSerializer.deserialize(AvroSerializer.java:143) at com.hurence.logisland.stream.spark.AbstractKafkaRecordStream$$anonfun$deserializeRecords$1.apply(AbstractKafkaRecordStream.scala:358) at com.hurence.logisland.stream.spark.AbstractKafkaRecordStream$$anonfun$deserializeRecords$1.apply(AbstractKafkaRecordStream.scala:354) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257) at scala.collection.AbstractIterator.toList(Iterator.scala:1157) at com.hurence.logisland.stream.spark.AbstractKafkaRecordStream.deserializeRecords(AbstractKafkaRecordStream.scala:368) at com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing$$anonfun$process$1$$anonfun$apply$1.apply(KafkaRecordStreamParallelProcessing.scala:147) at com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing$$anonfun$process$1$$anonfun$apply$1.apply(KafkaRecordStreamParallelProcessing.scala:126) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing$$anonfun$process$1.apply(KafkaRecordStreamParallelProcessing.scala:126) at com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing$$anonfun$process$1.apply(KafkaRecordStreamParallelProcessing.scala:95) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
Steps to reproduce the problem.
Configure a logisland processor to use the AvroSerializer class to deserialize the input topic records.
Ex:
parsing
- stream: parsing_stream
component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
type: stream
documentation: a processor that links
configuration:
kafka.input.topics: logisland_rawtest
kafka.output.topics: logisland_events
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: com.hurence.logisland.serializer.AvroSerializer
kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
avro.input.schema: >
{ "version": 1,
"name": "io.divolte.examples.record",
"type": "record",
"fields": [
{ "name": "timestamp", "type": "long" },
{ "name": "remoteHost", "type": "string"},
{ "name": "record_type", "type": ["null", "string"], "default": null },
{ "name": "record_id", "type": ["null", "string"], "default": null },
{ "name": "location", "type": ["null", "string"], "default": null },
{ "name": "localPath", "type": ["null", "string"], "default": null },
{ "name": "q", "type": ["null", "string"], "default": null },
{ "name": "n", "type": ["null", "int"], "default": null }
]
}
kafka.metadata.broker.list: sandbox:9092
kafka.zookeeper.quorum: sandbox:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 4
kafka.topic.default.replicationFactor: 1
Now generate an Avro record in the input Kafka topic. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic logisland_rawtest < /tmp/ex1.avro ex1.avro.txt