spring-boot
spring-boot copied to clipboard
Kafka Config for consumer/producer by topic Enhancements
We recently inherited a Porject with a lot of Kafka. Standing on the shoulder of the Spring Giants and with the recent updates we removed as much boilerplate as possible and tried to solve whats possible via configuration.
Delegating Serializer and Deserializer
Delegating Serializer and Deserializer
It took us quite a while to figure why our config was not picked up.
Obiously a beginners fail, solution is here
If you use spring.kafka.value.serialization.bytopic.config
(kafka property) you must set value-deserializer
to org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
A Working Config:
spring:
kafka:
...
consumer:
...
key-deserializer: "org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer"
value-deserializer: "org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer"
properties:
...
spring.kafka.key.serialization.bytopic.config: "${kafka.consumer.topic_one}:org.apache.kafka.common.serialization.StringDeserializer"
spring.kafka.value.serialization.bytopic.config: "${kafka.consumer.topic_one}:com.example.custom.KafkaSomethiungDeserializer"
-
Any chance that spring boot can warn about this or make those propeties its own so they can be autoconfigured?
-
On the same Topic the value of the
spring.kafka.value.serialization.bytopic.config
is a comma separted list of"topicregex:some.com.package.hard.to.read.and.maintain.if.there.is.more.than.one.serializer"
this list becomes hard to read/maintain. Beeing able to provide this list as "list" or even as map via yaml would be nice. -
To add some typesafety a Bean of ConsumerTopicToDeserializer or something similiar which autoconfiguration picks up to do it right and save us fools some time :-)
we used the customizer to add it before we found the solution up top
@Bean
fun customizeKafkaConsumerFactory(): DefaultKafkaConsumerFactoryCustomizer {
return DefaultKafkaConsumerFactoryCustomizer {
@Suppress("UNCHECKED_CAST")
val factory = it as DefaultKafkaConsumerFactory<String, Any>
run {
factory.setKeyDeserializer(ErrorHandlingDeserializer(StringDeserializer()))
factory.setValueDeserializer(
ErrorHandlingDeserializer(
DelegatingByTopicDeserializer(
mapOf(
Pattern.compile("SomeTopic") to SomerDeserializer(),
),
JsonDeserializer<Any>(),
)
)
)
};
}
}
- How about some configured
ErrorHandlingDeserializer
spring:
kafka:
...
consumer:
...
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
...
spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
spring.kafka.key.serialization.bytopic.config: "${kafka.consumer.topic_one}:org.apache.kafka.common.serialization.StringDeserializer"
spring.kafka.value.serialization.bytopic.config: "${kafka.consumer.topic_one}:com.example.custom.KafkaSomethiungDeserializer"
Maybe you see some things which can be adressed in the Documentation and/or autoconfig.
Thanks for the great work!
Thanks for the suggestions.
Any chance that spring boot can warn about this or make those properties its own so they can be auto-configured?
Boot's spring.kafka.consumer.key-deserializer
maps to Kafka's key.deserializer
property. The properties within spring.kafka.consumer.properties
mean nothing to Spring Boot and are passed straight through. Ultimately, these properties are all in the Map<String, Object>
that's used to create the Spring Kafka DefaultKafkaConsumerFactory(Map<String, Object>)
.
Given the above, I'm not sure that Boot is the right place to encode knowledge about the relationship between the key.deserializer
property and the spring.kafka.key.serialization.bytopic.config
property. It feels to me like Spring Kafka is better placed to do this as it's the project with the knowledge that you have to use a particular deserializer for spring.kafka.key.serialization.bytopic.config
to have an effect. Furthermore, Spring Kafka performing this validation would also mean that it would benefit everyone, not just those using Spring Boot.
WDYT, @artembilan?
On the same topic, the value of the
spring.kafka.value.serialization.bytopic.config
is a comma-separated list [that] becomes hard to read/maintain. Being able to provide this list as "list" or even as map via yaml would be nice.
It's Spring Kafka's DelegatingByTopicSerialization
sub-classes that process this property. They appear to support that value being either a Map
or a String
. The Map
case opens up some possibilities for richer configuration. You could implement something today in your app using a DefaultKafkaConsumerFactoryCustomizer
and DefaultKafkaConsumerFactory.updateConfigs(Map<String, Object>)
to configure the spring.kafka.key.serialization.bytopic.config
and spring.kafka.value.serialization.bytopic.config
properties with Map
values.
We could also look at providing dedicated configuration properties for this in Boot that we'd map to Map
values for spring.kafka.key.serialization.bytopic.config
and spring.kafka.value.serialization.bytopic.config
. When set, this would then open up the possibility of also setting key.deserializer
to a sensible default unless the user's also configured spring.kafka.consumer.key-deserializer
and/or spring.kafka.consumer.value-deserializer
. Alternatively, we could map this to the DefaultKafkaConsumerFactory
constructor that takes the deserializers as constructor arguments.
To add some typesafety a Bean of ConsumerTopicToDeserializer or something similiar which autoconfiguration picks up to do it right and save us fools some time
That's an interesting idea. It certainly feels preferable to configuring class names in YAML or properties which is always error prone.
How about some configured ErrorHandlingDeserializer
I'm not sure about this one as I think it could get rather complicated. You'd need to somehow configure a delegate everywhere that the error handling deserializer may be used. With some of the other improvements discussed above, I suspect there may be less of a need for this as it would be easier to perform the error-handling decoration in your own code.
OK. I see what is going on. You try to push coding from Java back to YAML which is indeed error-prone. I agree that simple one-stop-shop properties are OK to provide via YAML, but when we go to something more complicated or tied together, it is better to control it from the code where it is less possible to make a mistake.
That spring.kafka.value.serialization.bytopic.config
property is read only from the DelegatingByTopicDeserializer
, so no way to warn you that something is off.
Same for the ErrorHandlingDeserializer
and its spring.deserializer.value.delegate.class
.
Just because all those spring.kafka.consumer.properties
are outside of configuration properties binding in Spring Boot.
What I would suggest is like a top-level end-user Deserializer
bean to be injected into the KafkaAnnotationDrivenConfiguration
where we would propagate it down to the DefaultKafkaConsumerFactory
constructor:
public DefaultKafkaConsumerFactory(Map<String, Object> configs,
@Nullable Deserializer<K> keyDeserializer,
@Nullable Deserializer<V> valueDeserializer) {
Although from here it is not clear how to distinguish between key
& value
deserializer beans...
This way you would not need that DefaultKafkaConsumerFactoryCustomizer
, but rather would concentrate on the Deserializer
configuration in the familiar Java style.
I also wonder if this YAML feature would make it a bit easier for you for time being:
properties:
spring.kafka.value.serialization.bytopic.config:
- ${kafka.consumer.topic_one}: com.example.custom.KafkaSomethiungDeserializer
- ${kafka.consumer.topic_two}: org.apache.kafka.common.serialization.StringDeserializer
CC @sobychacko
+1 to the idea from @artembilan where users provide the deserializers as beans, and then Spring Boot injects them into DefaultKafkaConsumerFactory
, giving users control over how they want to configure the deserializers via code. We need to devise a way to distinguish between key and value deserializers. Maybe we can look into some naming conventions or something similar? I also like what @wilkinsona suggested for providing dedicated configuration options in Boot to make this integration with DefaultKafkaConsumerFactory
transparent. This way, the users can drive this from properties rather than providing deserializer beans.
Hi, I'm interested in this feature but I'm having trouble trying to configure it.
Can you show what the @KafkaListener
looks like for this?
In my case I have many topics, some topics use AVRO values and some use JSON values, so I use the confluent KafkaAvroDeserializer for one and the spring JsonDeserializer for the other, however, since each topic maps to a different resulting class I don't think it works.
Edit:
I had a go at implementing this, I set the avro deserializer as the bytopic default and ended up having to use custom deserializers for each JSON topic that just extends the spring JsonDeserializer with a concrete type, I was glad to see that the Kafkalistener can just accept ConsumerRecords<Object, Object>
+1 to getting spring.kafka.value.serialization.bytopic.config
configured as a map in yaml