spring-boot icon indicating copy to clipboard operation
spring-boot copied to clipboard

Kafka Config for consumer/producer by topic Enhancements

Open monsdroid opened this issue 1 year ago • 6 comments

We recently inherited a Porject with a lot of Kafka. Standing on the shoulder of the Spring Giants and with the recent updates we removed as much boilerplate as possible and tried to solve whats possible via configuration.

Delegating Serializer and Deserializer

Delegating Serializer and Deserializer It took us quite a while to figure why our config was not picked up. Obiously a beginners fail, solution is here If you use spring.kafka.value.serialization.bytopic.config (kafka property) you must set value-deserializer to org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer

A Working Config:

spring:
     kafka:
        ...
        consumer:
        ...
            key-deserializer: "org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer"
            value-deserializer: "org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer"
            properties:
            ...
                spring.kafka.key.serialization.bytopic.config: "${kafka.consumer.topic_one}:org.apache.kafka.common.serialization.StringDeserializer"
                spring.kafka.value.serialization.bytopic.config: "${kafka.consumer.topic_one}:com.example.custom.KafkaSomethiungDeserializer"
  1. Any chance that spring boot can warn about this or make those propeties its own so they can be autoconfigured?

  2. On the same Topic the value of the spring.kafka.value.serialization.bytopic.config is a comma separted list of "topicregex:some.com.package.hard.to.read.and.maintain.if.there.is.more.than.one.serializer" this list becomes hard to read/maintain. Beeing able to provide this list as "list" or even as map via yaml would be nice.

  3. To add some typesafety a Bean of ConsumerTopicToDeserializer or something similiar which autoconfiguration picks up to do it right and save us fools some time :-)

we used the customizer to add it before we found the solution up top

    @Bean
    fun customizeKafkaConsumerFactory(): DefaultKafkaConsumerFactoryCustomizer {
        return DefaultKafkaConsumerFactoryCustomizer {
            @Suppress("UNCHECKED_CAST")
            val factory = it as DefaultKafkaConsumerFactory<String, Any>
            run {
                factory.setKeyDeserializer(ErrorHandlingDeserializer(StringDeserializer()))
                factory.setValueDeserializer(
                    ErrorHandlingDeserializer(
                        DelegatingByTopicDeserializer(
                            mapOf(
                                Pattern.compile("SomeTopic") to SomerDeserializer(),
                            ),
                            JsonDeserializer<Any>(),
                        )
                    )
                )
            };
        }
    }
  1. How about some configured ErrorHandlingDeserializer
spring:
     kafka:
        ...
        consumer:
        ...
            key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            properties:
            ...
                spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
                spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer

                spring.kafka.key.serialization.bytopic.config: "${kafka.consumer.topic_one}:org.apache.kafka.common.serialization.StringDeserializer"
                spring.kafka.value.serialization.bytopic.config: "${kafka.consumer.topic_one}:com.example.custom.KafkaSomethiungDeserializer"

Maybe you see some things which can be adressed in the Documentation and/or autoconfig.

Thanks for the great work!

monsdroid avatar Jan 09 '24 10:01 monsdroid

Thanks for the suggestions.

Any chance that spring boot can warn about this or make those properties its own so they can be auto-configured?

Boot's spring.kafka.consumer.key-deserializer maps to Kafka's key.deserializer property. The properties within spring.kafka.consumer.properties mean nothing to Spring Boot and are passed straight through. Ultimately, these properties are all in the Map<String, Object> that's used to create the Spring Kafka DefaultKafkaConsumerFactory(Map<String, Object>).

Given the above, I'm not sure that Boot is the right place to encode knowledge about the relationship between the key.deserializer property and the spring.kafka.key.serialization.bytopic.config property. It feels to me like Spring Kafka is better placed to do this as it's the project with the knowledge that you have to use a particular deserializer for spring.kafka.key.serialization.bytopic.config to have an effect. Furthermore, Spring Kafka performing this validation would also mean that it would benefit everyone, not just those using Spring Boot.

WDYT, @artembilan?

On the same topic, the value of the spring.kafka.value.serialization.bytopic.config is a comma-separated list [that] becomes hard to read/maintain. Being able to provide this list as "list" or even as map via yaml would be nice.

It's Spring Kafka's DelegatingByTopicSerialization sub-classes that process this property. They appear to support that value being either a Map or a String. The Map case opens up some possibilities for richer configuration. You could implement something today in your app using a DefaultKafkaConsumerFactoryCustomizer and DefaultKafkaConsumerFactory.updateConfigs(Map<String, Object>) to configure the spring.kafka.key.serialization.bytopic.config and spring.kafka.value.serialization.bytopic.config properties with Map values.

We could also look at providing dedicated configuration properties for this in Boot that we'd map to Map values for spring.kafka.key.serialization.bytopic.config and spring.kafka.value.serialization.bytopic.config. When set, this would then open up the possibility of also setting key.deserializer to a sensible default unless the user's also configured spring.kafka.consumer.key-deserializer and/or spring.kafka.consumer.value-deserializer. Alternatively, we could map this to the DefaultKafkaConsumerFactory constructor that takes the deserializers as constructor arguments.

To add some typesafety a Bean of ConsumerTopicToDeserializer or something similiar which autoconfiguration picks up to do it right and save us fools some time

That's an interesting idea. It certainly feels preferable to configuring class names in YAML or properties which is always error prone.

How about some configured ErrorHandlingDeserializer

I'm not sure about this one as I think it could get rather complicated. You'd need to somehow configure a delegate everywhere that the error handling deserializer may be used. With some of the other improvements discussed above, I suspect there may be less of a need for this as it would be easier to perform the error-handling decoration in your own code.

wilkinsona avatar Feb 09 '24 11:02 wilkinsona

OK. I see what is going on. You try to push coding from Java back to YAML which is indeed error-prone. I agree that simple one-stop-shop properties are OK to provide via YAML, but when we go to something more complicated or tied together, it is better to control it from the code where it is less possible to make a mistake.

That spring.kafka.value.serialization.bytopic.config property is read only from the DelegatingByTopicDeserializer, so no way to warn you that something is off. Same for the ErrorHandlingDeserializer and its spring.deserializer.value.delegate.class. Just because all those spring.kafka.consumer.properties are outside of configuration properties binding in Spring Boot.

What I would suggest is like a top-level end-user Deserializer bean to be injected into the KafkaAnnotationDrivenConfiguration where we would propagate it down to the DefaultKafkaConsumerFactory constructor:

	public DefaultKafkaConsumerFactory(Map<String, Object> configs,
			@Nullable Deserializer<K> keyDeserializer,
			@Nullable Deserializer<V> valueDeserializer) {

Although from here it is not clear how to distinguish between key & value deserializer beans...

This way you would not need that DefaultKafkaConsumerFactoryCustomizer, but rather would concentrate on the Deserializer configuration in the familiar Java style.

I also wonder if this YAML feature would make it a bit easier for you for time being:

properties:
        spring.kafka.value.serialization.bytopic.config:
          - ${kafka.consumer.topic_one}: com.example.custom.KafkaSomethiungDeserializer
          - ${kafka.consumer.topic_two}: org.apache.kafka.common.serialization.StringDeserializer

CC @sobychacko

artembilan avatar Feb 09 '24 16:02 artembilan

+1 to the idea from @artembilan where users provide the deserializers as beans, and then Spring Boot injects them into DefaultKafkaConsumerFactory, giving users control over how they want to configure the deserializers via code. We need to devise a way to distinguish between key and value deserializers. Maybe we can look into some naming conventions or something similar? I also like what @wilkinsona suggested for providing dedicated configuration options in Boot to make this integration with DefaultKafkaConsumerFactory transparent. This way, the users can drive this from properties rather than providing deserializer beans.

sobychacko avatar Feb 09 '24 19:02 sobychacko

Hi, I'm interested in this feature but I'm having trouble trying to configure it.

Can you show what the @KafkaListener looks like for this?

In my case I have many topics, some topics use AVRO values and some use JSON values, so I use the confluent KafkaAvroDeserializer for one and the spring JsonDeserializer for the other, however, since each topic maps to a different resulting class I don't think it works.

Edit:

I had a go at implementing this, I set the avro deserializer as the bytopic default and ended up having to use custom deserializers for each JSON topic that just extends the spring JsonDeserializer with a concrete type, I was glad to see that the Kafkalistener can just accept ConsumerRecords<Object, Object>

+1 to getting spring.kafka.value.serialization.bytopic.config configured as a map in yaml

StephenFlavin avatar May 09 '24 08:05 StephenFlavin