sbt-avrohugger icon indicating copy to clipboard operation
sbt-avrohugger copied to clipboard

question re IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

Open nicolaemarasoiu opened this issue 4 years ago • 0 comments


We have like this:

addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0-RC22") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.2") addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.3")

scalaVersion := "2.12.10" name := "global-topic-conveyor" organization := "" version := "1.0" scalacOptions ++= Seq( "-deprecation", "-feature", "-unchecked", "-language:higherKinds", // "-Xfatal-warnings", "-Xlint:infer-any", "-Xlint:unused" )

resolvers += "confluent" at ""

val confluentVersion = "5.4.0" val kafkaVersion = confluentVersion + "-ccs"

libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" libraryDependencies += "com.typesafe" % "config" % "1.4.0" libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4" libraryDependencies += "io.circe" %% "circe-parser" % "0.13.0" libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion % "test" libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % "test" libraryDependencies += "com.github.tomakehurst" % "wiremock" % "2.25.1" % "test"

avroSourceDirectories in Compile += (sourceDirectory in Compile).value / "resources" / "avro" sourceGenerators in Compile += (avroScalaGenerate in Compile).taskValue

assemblyJarName in assembly := "global-topic-conveyor.jar"

parallelExecution in Test := false

sourceGenerators in Compile += Def.task { val schemasPath = (sourceDirectory in Compile).value / "resources" / "avro""Parsing schemas from $schemasPath") val managedFolder = (sourceManaged in Compile).value val outFile = CreateImplicitsForSchemas.writeImplicitsForSchemas(schemasPath, managedFolder)"Generated implicits for Avro4s: $outFile") Seq(outFile) }.taskValue

wartremoverErrors in (Compile, compile) ++= Warts.unsafe.filterNot(_ == Wart.Any) ++ Seq( Wart.Nothing )

wartremoverExcluded += sourceManaged.value assemblyMergeStrategy in assembly := { // TODO: module-info.class is conflicting for com.fasterxml.jackson after importing the new kafka libraries // This has been addressed by discarding the file from the fat jar (as per the MergeStrategy.discard) case "module-info.class" => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) }

// Since the uberjar is deployed by the CI process and tests are performed before the jar is built // There's no need for testing again before assembling, hence the next line test in assembly := {}

package com.ovoenergy.globaltopics.pipelines.orion

import java.time.{Instant, LocalDate} import java.util.{Properties, UUID}

import com.ovoenergy.kafka.common.event.Metadata import com.ovoenergy.kafka.identity.event.{ CreatedOrUpdate, Deleted, FormattedAddress, User, UserEvent } import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroSerializer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer

object Producer { def main(args: Array[String]): Unit = { val topic = "identity_users_v1" val props = fromMap( Map( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer], ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer], AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081" ) ) val producer = new KafkaProducerString, UserEvent val userEventId = "userEventId__alsjbfahjlsbefhjashdfbjsaf" val userId = "userId__qowiuefg324uf3qhf3q4" val inputMessageValue = UserEvent( metadata = Metadata(eventId = userEventId, traceToken = "asdfkjasdf", createdAt =, realm = "ovo-france", event = Left[CreatedOrUpdate, Deleted]( CreatedOrUpdate( User( id = userId, title = Some("Mr"), givenName = "John", familyName = "Smith", emailAddress = Some("[email protected]"), birthdate = Some(LocalDate.of(1978, 6, 4)), phoneNumbers = List("072189736"), permissions = Nil, postalAddress = Some(List("Unused address, we use the formattedAddresses instead")), formattedAddresses = List( FormattedAddress( addressType = "current", country = "GB", address = """ { "line1":"195 Radburn Close", "town":"Eastbourne", "postcode":"CM18 7EQ" } """ ) ) ) ) ) ) discard( producer.send( new ProducerRecord[String, UserEvent](topic, UUID.randomUUID.toString, inputMessageValue) ) ) producer.flush() } def discard(evaluateForSideEffectOnly: Any): Unit = { val _: Any = evaluateForSideEffectOnly () }

def fromMap(properties: Map[String, Object]): Properties = { val result = properties.foldLeft(new Properties()) { case (props, (k, v)) => discard(props.put(k, v)) props } println("For map=" + properties.toString() + ", properties=" + result.toString) result }



{"message":"[Producer clientId=producer-1] Cluster ID: sqjPEJjHQyuly4zXPuqdmA","logger":"org.apache.kafka.clients.Metadata","thread":"kafka-producer-network-thread | producer-1","severity":"INFO","timestamp":{"seconds":1587741874,"nanos":201000000}}

Exception in thread "main" java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord at io.confluent.kafka.serializers.AvroSchemaUtils.getSchema( at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize( at org.apache.kafka.common.serialization.Serializer.serialize( at org.apache.kafka.clients.producer.KafkaProducer.doSend( at org.apache.kafka.clients.producer.KafkaProducer.send( at org.apache.kafka.clients.producer.KafkaProducer.send( at com.ovoenergy.globaltopics.pipelines.orion.Producer$.main(Producer.scala:66) at com.ovoenergy.globaltopics.pipelines.orion.Producer.main(Producer.scala)

nicolaemarasoiu avatar Apr 24 '20 15:04 nicolaemarasoiu