hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Kafka Avro Confluent Schema Registry version 7 compatibility issues

Open zachtrong opened this issue 1 year ago • 3 comments

Context

  • Have you gone through our FAQs? Yes

Problem description

Latest Apache Hudi release (0.14.0) are using kafka-avro-serializer-5.3.4.jar, which causes deserialization issues when apply with Kafka Avro datasource and confluent REST api version 6/7.

Sample Avro schema:

{"id":36,"subject":"test-value","version":12,"schema":"{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"mongo.bot.test.test\"}","references":[]}

Key error is "references": [].

Suggestion Upgrade jar to io.confluent:kafka-avro-serializer:7.5.1 from confluent repository.

Reference: kafka-avro-serializer:7.5.1

Expected behavior

Apache Hudi is able to parse the above Avro schema without error.

Environment Description

  • Hudi version: 0.14.0

  • Spark version: 3.4.1

  • Hive version: 3.1.3

  • Hadoop version: 3.3.4

  • Storage (HDFS/S3/GCS..): S3

  • Running on Docker? (yes/no): yes

Additional context

Add any other context about the problem here.

Stacktrace

Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "references" (class io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString), not marked as ignorable (one known property: "schema"])
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2063] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString["references"])
        at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1132)
        at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2202)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1705)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1683)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:320)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)
        at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4730)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3722)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:495)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:488)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:177)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:256)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:235)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107)
        at org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.deserialize(KafkaAvroSchemaDeserializer.java:79)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1386)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1617)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
        at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:207)
        at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:136)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:40)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:39)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:219)
        at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:257)
        at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:225)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at scala.collection.AbstractIterator.to(Iterator.scala:1431)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
        at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1462)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

zachtrong avatar Nov 30 '23 05:11 zachtrong

@zachtrong Sorry for the delay here. Did you tried upgrading the version in your setup and did that worked fine?

If it work and you don't see any issue with the upgraded version, Can you create a PR for the same. I can help you to get it reviewed.

ad1happy2go avatar Dec 19 '23 12:12 ad1happy2go

I'm not quite able to build & fix hudi yet, quick fixes on api side to return old format.

zachtrong avatar Dec 20 '23 16:12 zachtrong

Got it. Thanks. Let us know in case you need help on this.

ad1happy2go avatar Dec 22 '23 16:12 ad1happy2go