Kafka Avro Deserializer is not able to deserialize timestamps with "logicalType": "timestamp-micros"
Kafka Avro Deserializer is not able to deserialize timestamps with "logicalType": "timestamp-micros".
I am on Confluent 5.4, Avro 1.9.1, Commercehub Gradle plugin 0.20.0
I have a Avro Schema with the following field -
{
"name": "CRTE_TMS",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
],
"default": null
}
In Avro generated POJO, CRTE_TMS type is java.time.Instant.
I have created the consumer as mentioned in the tutorial - https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/
@KafkaListener(topics = "users", groupId = "group_id") (2)
public void consume(ConsumerRecord<String, User> record) {
log.info(String.format("Consumed message -> %s", record.value()));
}
When I run my application, I get an exception as below -
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at com.sample.User.put(User.java:229) ~[main/:na]
at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.9.1.jar:1.9.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.9.1.jar:1.9.1]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:287) ~[kafka-avro-serializer-5.4.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102) ~[kafka-avro-serializer-5.4.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81) ~[kafka-avro-serializer-5.4.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-5.4.0.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1091) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1047) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_241]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_241]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]
I am successfully able to convert the Kafka Avro message to Generic Record.
I have tried all different sources available on internet and looks like this is a problem with Kafka Avro Deserializer or I am missing something.
Having similar issue but when trying to convert Avro Record to Kafka connect schema like this
final var avroData = new AvroData(new AvroDataConfig(
Map.of("schema.registry.url", "http://localhost:8081",
"enhanced.avro.schema.support", true)));
avroData
.toConnectData(VesselPosition.getClassSchema(), record)
.schema();
And I see this as output:
Invalid type for INT64: class java.time.Instant
And I actually don't know if it is a schema-registry bug or an avro bug, as to me the type in the schema is declared as long
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
And in Java, it's represented as Instant which is not obvious to me at least, as I see under the hood avro-schema classes make a simple cast to long in this case.
I think this is an AVRO issue.
In my case, CRTE_TMS was created as a Union type field (It can contain null or a long value) in avro schema. When avro-tools generated a POJO class, it denoted a CRTE_TMS as Instant field based on 'Logical Type' of the field. This all makes sense.
But when Kafka client tries to deserialize the message it cannot, throwing class cast exception. As there is no cast generated by avro-tools for this field in POJO class.
To get rid of this error, I manually added those casts / conversions in the generated POJO class and it worked. I am not sure if this is the right thing to do or not.
You can find details of what I am talking about in this question - https://stackoverflow.com/questions/63642870/avro-is-not-able-to-deserialize-union-with-logical-types-in-fields
We just ran into this issue as well. We don't have a fix for this yet, we just decided to work around this issue by removing the null type instead.
https://github.com/apache/avro/pull/1721 might fix this.
We stumbled over this issue yesterday on prod.
"type" : ["null", {"type" : "long", "logicalType" : "timestamp-millis"}]
With Avro serializer version 7.5.0 it resulted in a proper Instant, with 7.5.1 we get a Long with a class cast exception afterwards. Even using avro 1.11.3 has not fixed that, so we need to rely on using 7.5.0 for until this problem is fixed.
I justed bumped into the same issue with timestamp-millis. Reverting to 7.5.0 fixes the issue.
Is this fixed now in 7.6.x? Sadly this one seemed to have gone unnoticed