KafkaCompanion does not configure Serializer/Deserializer
The KafkaCompanion does not properly configure a registered Serde, Serializer and/or Deserialiser which causes issues with certain implementations.
E.g. When using Confluents io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde (and it's serializer/deserializer) not properly calling configure causes a test to fail with:
java.lang.AssertionError: Expected a completion event but got a failure: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient
This behaviour can be verified by adding a test case to SerdesTest:
@Test
void testRegisteredSerdeShouldBeConfigured() {
// Custom Serde which Serialize/Deserializer throw exception if not configured
companion.registerSerde(Person.class, new Serde<>() {
// Custom Deserializer which throws an exception if not configured
private PersonDeserializer personDeserializer = new PersonDeserializer() {
private boolean isConfigured = false;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
isConfigured = true;
}
@Override
public Person deserialize(String s, byte[] bytes) {
if (isConfigured) {
return super.deserialize(s, bytes);
} else {
throw new SerializationException("Deserializer not configured");
}
}
};
// Custom Serializer which throws an exception if not configured
private PersonSerializer personSerializer = new PersonSerializer() {
private boolean isConfigured = false;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
isConfigured = true;
}
@Override
public byte[] serialize(String s, Person person) {
if (isConfigured) {
return super.serialize(s, person);
} else {
throw new SerializationException("Serializer not configured");
}
}
};
@Override
public Serializer<Person> serializer() {
return personSerializer;
}
@Override
public Deserializer<Person> deserializer() {
return personDeserializer;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Configure the serializer and deserializer
personSerializer.configure(configs, isKey);
personDeserializer.configure(configs, isKey);
}
});
companion.produce(Person.class).fromRecords(
new ProducerRecord<>(topic, new Person("1", 30)),
new ProducerRecord<>(topic, new Person("2", 25)),
new ProducerRecord<>(topic, new Person("3", 18))).awaitCompletion();
ConsumerBuilder<String, Person> consumer = companion.consumeWithDeserializers(PersonDeserializer.class.getName());
ConsumerTask<String, Person> task = consumer.fromTopics(topic, 3).awaitCompletion();
assertThat(task.getRecords()).hasSize(3);
}
which will currently fail with
java.lang.AssertionError: Expected a completion event but got a failure: org.apache.kafka.common.errors.SerializationException: Serializer not configured
KafkaCompanion should be updated to call configure on the Serde so the Serializer and Deserialiser properly get configured before a message is published or consumed.
Due to this issue, other serialisers like the StringSerializer, UUIDSerializer, ListSerializer, etc could also give unexpected results in tests.
Provided a fix in PR #2757
@diversit Thank you for opening this detailed issue and the PR.
This was intentional, following the typical Kafka client behavior. If you pass the serde type name, configure will be called by the client constructor. I believe there was a javadoc note on that.
That being said, I think configure could've called the for creators from registered serde. Would that make sense for you ?
@ozangunalp Thanks for your reply.
I understand your point. I was in the understanding the SerDe would required configuration provided by the KafaCompanion and I thought the getCommonClientConfig() would not be sufficient.
From the docs registering custom serdes I had not understood I had to configure the SerDe myself. And I'm having trouble finding the Javadocs for the SmallRye projects since it does not seem to be linked from the SmallRye site or Github repo's.
Could you please point me to the Javadoc note your mentioned?
After better reading the docs of SpecificAvroSerde I know understand I have to configure it myself with the appropriate config, before registering it with the KafkaCompanion.
Next challenge will then be how to get the url from the Schema Registry from Quarkus in dev/test modes.
So I guess this issue can be closed then.
You can access the javadoc from here : https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-kafka-test-companion/latest/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.html
Next challenge will then be how to get the url from the Schema Registry from Quarkus in dev/test modes.
If you use the KafkaCompanionResource it is done in here : https://github.com/quarkusio/quarkus/blob/50f2d3f1bd14b0b8f518d8d346422e0dc7199898/test-framework/kafka-companion/src/main/java/io/quarkus/test/kafka/KafkaCompanionResource.java#L28