Smallrye-Reactive-Messages "dead letter queue" on Confluent Kafka does not default to Avro serializer for topics that use Avro deserializers
Describe the bug
I have a Kafka topic with an Avro schema, both on the original topic and on the associated dead letter queue topic. Until Quarkus < 3.5 I used this configuration and it worked fine (note that there is no serialized specified on the dead letter queue value):
mp.messaging.incoming.my-topic.topic=my_topic
mp.messaging.incoming.my-topic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.incoming.my-topic.specific.avro.reader=true
mp.messaging.incoming.my-topic.failure-strategy=dead-letter-queue
mp.messaging.incoming.my-topic.dead-letter-queue.topic=my_topic_dlq
With Quarkus >= 3.5 the messages in dead letter do not have anymore the payload from the original message (value was always null).
The workaround is to specify the serializer for the dead letter queue value:
mp.messaging.incoming.my-topic.dead-letter-queue.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
Expected behavior
The dead letter queue is serialized with the Avro serializer based on the deserializer used on the original topic.
Actual behavior
The dead letter queue is serialized with the wrong serializer and the value is null
How to Reproduce?
- configure a project with Quarkus < 3.5 that uses Confluent Kafka + Avro on the value
- update Quarkus >= 3.5
- the dead letter queue value is null
Output of uname -a or ver
Darwin M1-83300006.local 23.2.0 Darwin Kernel Version 23.2.0: Wed Nov 15 21:55:06 PST 2023; root:xnu-10002.61.3~2/RELEASE_ARM64_T6020 arm64
Output of java -version
openjdk version "21.0.1" 2023-10-17 LTS OpenJDK Runtime Environment Temurin-21.0.1+12 (build 21.0.1+12-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.1+12 (build 21.0.1+12-LTS, mixed mode)
Quarkus version or git rev
3.5
Build tool (ie. output of mvnw --version or gradlew --version)
Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae) Maven home: /Users/antoniobonifacio/.sdkman/candidates/maven/current Java version: 21.0.1, vendor: Eclipse Adoptium, runtime: /Users/antoniobonifacio/.sdkman/candidates/java/21.0.1-tem Default locale: it_IT, platform encoding: UTF-8 OS name: "mac os x", version: "14.2.1", arch: "aarch64", family: "mac"
Additional information
Probably related to https://github.com/quarkusio/quarkus/pull/36347
/cc @Ladicek (smallrye), @alesj (kafka), @cescoffier (kafka), @gastaldi (m1), @jmartisk (smallrye), @ozangunalp (kafka), @phillip-kruger (smallrye), @radcortez (smallrye)
I've seen this same issue with the confluent protobuf deser (same version >= 3.5.0). The debugger is showing that dead-letter-queue.value.serializer is set to some synthetic class based off of the value type. There is no such configuration in the application itself.
On Quarkus 3.4.x that method returns an empty optional.
setting the dead letter value serializer is a viable workaround
dead-letter-queue.value.serializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
This is a head-scratcher for me too. What I think is happening is the schema auto-detection kicks in and it detects that it is not a avro/protobuf type and it seems like it generates a deserializer. Since #36347 it also detects that you are using DLQ and generates a serializer. Because you are configuring the KafkaAvroDeserializer yourself it is not a problem for the main consumer, but the generated DLQ producer serializer will be picked up, where it shouldn't...
What is the type you are consuming in both (avro and protobuf) cases?
What is the type you are consuming in both (avro and protobuf) cases?
In my case (Avro), I am consuming a SpecificRecord generated from an avdl file:
@org.apache.avro.specific.AvroGenerated
public class MyRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { ... }
And is MyRecord inside the app or compiled in another module and you add to your app with a dependency ?
And is
MyRecordinside the app or compiled in another module and you add to your app with a dependency ?
Hi, in my case the avdl files (and generated classes) are in a separated maven module in the same project, and it's a dependency for the module with the consumer.
Ok if you configure the bean discovery for the module defining avdl files (see https://quarkus.io/guides/cdi-reference#how-to-generate-a-jandex-index) you should have both main channel deserializer config and the DLQ serializer config auto generated for you.
Ok if you configure the bean discovery for the module defining avdl files (see
https://quarkus.io/guides/cdi-reference#how-to-generate-a-jandex-index) you should have both main channel deserializer config and the DLQ serializer config auto generated for you.
Hi, I removed all value.serializer and value.deserializer configs from my property file and now it seems to work after the addition of the jandex plugin, as you suggested 👍🏼
My setup is similar. A proto file with generated java classes in a separate module. I'll try adding the jandex index.
Configuring jandex made no difference.
I even moved the proto files into the application module, no joy.
Disabling QUARKUS_REACTIVE_MESSAGING_KAFKA_SERIALIZER_GENERATION_ENABLED did resolve it