quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Smallrye-Reactive-Messages "dead letter queue" on Confluent Kafka does not default to Avro serializer for topics that use Avro deserializers

Open anbonifacio opened this issue 1 year ago • 7 comments

Describe the bug

I have a Kafka topic with an Avro schema, both on the original topic and on the associated dead letter queue topic. Until Quarkus < 3.5 I used this configuration and it worked fine (note that there is no serialized specified on the dead letter queue value):

mp.messaging.incoming.my-topic.topic=my_topic
mp.messaging.incoming.my-topic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.incoming.my-topic.specific.avro.reader=true
mp.messaging.incoming.my-topic.failure-strategy=dead-letter-queue
mp.messaging.incoming.my-topic.dead-letter-queue.topic=my_topic_dlq

With Quarkus >= 3.5 the messages in dead letter do not have anymore the payload from the original message (value was always null). The workaround is to specify the serializer for the dead letter queue value: mp.messaging.incoming.my-topic.dead-letter-queue.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

Expected behavior

The dead letter queue is serialized with the Avro serializer based on the deserializer used on the original topic.

Actual behavior

The dead letter queue is serialized with the wrong serializer and the value is null

How to Reproduce?

  1. configure a project with Quarkus < 3.5 that uses Confluent Kafka + Avro on the value
  2. update Quarkus >= 3.5
  3. the dead letter queue value is null

Output of uname -a or ver

Darwin M1-83300006.local 23.2.0 Darwin Kernel Version 23.2.0: Wed Nov 15 21:55:06 PST 2023; root:xnu-10002.61.3~2/RELEASE_ARM64_T6020 arm64

Output of java -version

openjdk version "21.0.1" 2023-10-17 LTS OpenJDK Runtime Environment Temurin-21.0.1+12 (build 21.0.1+12-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.1+12 (build 21.0.1+12-LTS, mixed mode)

Quarkus version or git rev

3.5

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae) Maven home: /Users/antoniobonifacio/.sdkman/candidates/maven/current Java version: 21.0.1, vendor: Eclipse Adoptium, runtime: /Users/antoniobonifacio/.sdkman/candidates/java/21.0.1-tem Default locale: it_IT, platform encoding: UTF-8 OS name: "mac os x", version: "14.2.1", arch: "aarch64", family: "mac"

Additional information

Probably related to https://github.com/quarkusio/quarkus/pull/36347

anbonifacio avatar Jan 18 '24 16:01 anbonifacio

/cc @Ladicek (smallrye), @alesj (kafka), @cescoffier (kafka), @gastaldi (m1), @jmartisk (smallrye), @ozangunalp (kafka), @phillip-kruger (smallrye), @radcortez (smallrye)

quarkus-bot[bot] avatar Jan 18 '24 16:01 quarkus-bot[bot]

I've seen this same issue with the confluent protobuf deser (same version >= 3.5.0). The debugger is showing that dead-letter-queue.value.serializer is set to some synthetic class based off of the value type. There is no such configuration in the application itself.

Screenshot 2024-02-09 at 6 29 25 PM

On Quarkus 3.4.x that method returns an empty optional.

pcasaes avatar Feb 10 '24 02:02 pcasaes

setting the dead letter value serializer is a viable workaround

dead-letter-queue.value.serializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

pcasaes avatar Feb 10 '24 02:02 pcasaes

This is a head-scratcher for me too. What I think is happening is the schema auto-detection kicks in and it detects that it is not a avro/protobuf type and it seems like it generates a deserializer. Since #36347 it also detects that you are using DLQ and generates a serializer. Because you are configuring the KafkaAvroDeserializer yourself it is not a problem for the main consumer, but the generated DLQ producer serializer will be picked up, where it shouldn't...

What is the type you are consuming in both (avro and protobuf) cases?

ozangunalp avatar Feb 15 '24 12:02 ozangunalp

What is the type you are consuming in both (avro and protobuf) cases?

In my case (Avro), I am consuming a SpecificRecord generated from an avdl file:

@org.apache.avro.specific.AvroGenerated
public class MyRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { ... }

anbonifacio avatar Feb 16 '24 10:02 anbonifacio

And is MyRecord inside the app or compiled in another module and you add to your app with a dependency ?

ozangunalp avatar Feb 19 '24 10:02 ozangunalp

And is MyRecord inside the app or compiled in another module and you add to your app with a dependency ?

Hi, in my case the avdl files (and generated classes) are in a separated maven module in the same project, and it's a dependency for the module with the consumer.

anbonifacio avatar Feb 19 '24 10:02 anbonifacio

Ok if you configure the bean discovery for the module defining avdl files (see https://quarkus.io/guides/cdi-reference#how-to-generate-a-jandex-index) you should have both main channel deserializer config and the DLQ serializer config auto generated for you.

ozangunalp avatar Feb 19 '24 13:02 ozangunalp

Ok if you configure the bean discovery for the module defining avdl files (see https://quarkus.io/guides/cdi-reference#how-to-generate-a-jandex-index) you should have both main channel deserializer config and the DLQ serializer config auto generated for you.

Hi, I removed all value.serializer and value.deserializer configs from my property file and now it seems to work after the addition of the jandex plugin, as you suggested 👍🏼

anbonifacio avatar Feb 19 '24 16:02 anbonifacio

My setup is similar. A proto file with generated java classes in a separate module. I'll try adding the jandex index.

pcasaes avatar Feb 21 '24 06:02 pcasaes

Configuring jandex made no difference.

I even moved the proto files into the application module, no joy.

Disabling QUARKUS_REACTIVE_MESSAGING_KAFKA_SERIALIZER_GENERATION_ENABLED did resolve it

pcasaes avatar Feb 21 '24 07:02 pcasaes