poc-spring-boot-dynamic-controller icon indicating copy to clipboard operation
poc-spring-boot-dynamic-controller copied to clipboard

@KafkaListner NOT Able to Fire/Trigger - For Dynamic Component

Open sunil31925 opened this issue 3 years ago • 10 comments

  • Using Byte-Buddy we create the component class as mention in attachment
  • Create Method return void and annotateMethod with @kafkaListener and load the class - ISSUE: Kafka Listener never load the Kafka Consumer Config and never trigger the @kafkaListener

KafkaListner_trigger_issue

sunil31925 avatar Sep 09 '20 12:09 sunil31925

Hi, thanks for opening the interesting issue!

Kafka Listener never load the Kafka Consumer Config and never trigger the @kafkaListener

because @EnableKafka and @KafkaListener are registered differently in Spring, processed by KafkaListenerAnnotationBeanPostProcessor:

Bean post-processor that registers methods annotated with {@link KafkaListener} to be invoked by a Kafka message listener container created under the covers by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory} according to the parameters of the annotation. Annotated methods can use flexible arguments as defined by {@link KafkaListener}. This post-processor is automatically registered by Spring's {@link EnableKafka} annotation. ...

this repository contains an example in which controllers created dynamically, in UserDynamicControllerRegister. registerUserController() method which annotated with @PostConstruct, while the ApplicationContext has already been created, and this works because Spring allows to register registerMapping when ApplicationContext is already created, using RequestMappingHandlerMapping

to register dynamic config and@KafkaListener you can use BeanFactoryPostProcessor to register dynamic config and@KafkaListener before KafkaListenerAnnotationBeanPostProcessor invocation, simple example:

MyBeanFactoryPostProcessor implementation with dynamic config and@KafkaListener:

package com.example;

import com.example.MyPayload;
import java.lang.reflect.Modifier;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.asm.MemberAttributeExtension;
import net.bytebuddy.description.annotation.AnnotationDescription;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.Argument;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Configuration;

import static net.bytebuddy.matcher.ElementMatchers.named;

@Component
public class MyBeanFactoryPostProcessor implements BeanFactoryPostProcessor {

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        try {
            // registers dynamicKafkaListenerConfig bean
            Class<?> dynamicKafkaListenerConfig = generateDynamicKafkaListenerConfig();
            AbstractBeanDefinition dynamicKafkaListenerConfigBeanDefinition = BeanDefinitionBuilder
                .rootBeanDefinition(dynamicKafkaListenerConfig)
                .getBeanDefinition();
            ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(
                "dynamicKafkaListenerConfig", dynamicKafkaListenerConfigBeanDefinition);

            // registers dynamicKafkaListener bean
            Class<?> dynamicKafkaListener = generateDynamicKafkaListener();
            AbstractBeanDefinition dynamicKafkaListenerBeanDefinition = BeanDefinitionBuilder
                .rootBeanDefinition(dynamicKafkaListener)
                .getBeanDefinition();

            ((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(
                "dynamicKafkaListener", dynamicKafkaListenerBeanDefinition);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    // generates dynamicKafkaListenerConfig class
    private Class<?> generateDynamicKafkaListenerConfig() throws Exception {
        return new ByteBuddy()

            .subclass(Object.class)
            .name("DynamicKafkaListenerConfig")
            .annotateType(AnnotationDescription.Builder.ofType(Configuration.class)
                .build(), AnnotationDescription.Builder.ofType(EnableKafka.class)
                .build())

            .make()
            .load(getClass().getClassLoader())
            .getLoaded();
    }

    // generates dynamicKafkaListener class
    private Class<?> generateDynamicKafkaListener() throws Exception {
        return new ByteBuddy()

            .subclass(Object.class)
            .name("DynamicKafkaListener")
            .annotateType(AnnotationDescription.Builder.ofType(Component.class)
                .build())

            .defineMethod("consume", void.class, Modifier.PUBLIC)
            .withParameter(MyPayload.class, "myPayload")
            .annotateParameter(AnnotationDescription.Builder.ofType(Payload.class)
                .build())
            .intercept(MethodDelegation.to(DynamicKafkaConsumerIntercept.class))

            .visit(new MemberAttributeExtension.ForMethod()
                .annotateMethod(AnnotationDescription.Builder.ofType(KafkaListener.class)
                    .defineArray("topics", new String[]{"my-topic"})
                    .define("groupId", "my-groupId")
                    .build())
                .on(named("consume")))

            .make()
            .load(getClass().getClassLoader())
            .getLoaded();
    }

    // simple interceptor for DynamicKafkaConsumer
    public static class DynamicKafkaConsumerIntercept {

        public static void consume(@Argument(0) MyPayload myPayload) {
            System.out.println("Received myPayload: " + myPayload);
        }

    }

}

generated DynamicKafkaListenerConfig.class result:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

@Configuration("")
@EnableKafka
public class DynamicKafkaListenerConfig {
    public DynamicKafkaListenerConfig() {
    }
}

generated DynamicKafkaListener.class result:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

import com.example.MyPayload;
import io.tpd.kafkaexample.MyBeanFactoryAware.DynamicKafkaConsumerIntercept;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component("")
public class DynamicKafkaListener {
    @KafkaListener(
        concurrency = "",
        topics = {"my-topic"},
        topicPartitions = {},
        errorHandler = "",
        autoStartup = "",
        topicPattern = "",
        id = "",
        groupId = "my-groupId",
        beanRef = "__listener",
        clientIdPrefix = "",
        containerGroup = "",
        containerFactory = "",
        idIsGroup = true
    )
    public void consume(@Payload(required = true,expression = "",value = "") MyPayload myPayload) {
        DynamicKafkaConsumerIntercept.consume(var1);
    }

    public DynamicKafkaListener() {
    }
}

in this case, dynamic config and@KafkaListener will work as it will be processed by KafkaListenerAnnotationBeanPostProcessor

does this solution solve your issue?

tsarenkotxt avatar Sep 09 '20 19:09 tsarenkotxt

MyPayload what does this class contains?

sunil31925 avatar Sep 09 '20 20:09 sunil31925

MyPayload is simple POJO, based on PracticalAdvice, tested with @Payload PracticalAdvice payload - I removed all listeners and generated the first one with the MyBeanFactoryPostProcessor.

P.S. I've updated the previous answer - added @Configuration to the DynamicKafkaListenerConfig.class

tsarenkotxt avatar Sep 09 '20 23:09 tsarenkotxt

Kindly share MyPayload pojo class if possible, This is great work for me now.

Thank you

sunil31925 avatar Sep 10 '20 13:09 sunil31925

MyPayload class:

import com.fasterxml.jackson.annotation.JsonProperty;

public class MyPayload {

    private final String message;
    private final int identifier;

    public MyPayload(@JsonProperty("message") final String message,
                     @JsonProperty("identifier") final int identifier) {
        this.message = message;
        this.identifier = identifier;
    }

    public String getMessage() {
        return message;
    }

    public int getIdentifier() {
        return identifier;
    }

    @Override
    public String toString() {
        return "MyPayload{" +
            "message='" + message + '\'' +
            ", identifier=" + identifier +
            '}';
    }

}

you are welcome

tsarenkotxt avatar Sep 10 '20 20:09 tsarenkotxt

This will not work our message of Type org.springframework.messagsing.Message @ payload Message<MyObjectType> message message.getHeaders() message.getPayload()

sunil31925 avatar Sep 10 '20 20:09 sunil31925

I generated @KafkaListener method with the following parameter and annotation:

  public void consume(@Payload MyPayload myPayload){
      ...
  }

in your case you have to generate method with Message parameter without @Payload annotation:

  public void consume(Message<?> message){
     Object payload = message.getPayload();
     MessageHeaders headers = message.getHeaders();
     ...
  }

since payload is already in the Message, this should work

tsarenkotxt avatar Sep 10 '20 21:09 tsarenkotxt

does it work in your case?

tsarenkotxt avatar Sep 10 '20 21:09 tsarenkotxt

How to deal with Database call FROM BeanFactoryPostProcessor . I want to read metadata from database and create many consumer base on desfine in database

sunil31925 avatar Sep 11 '20 18:09 sunil31925

if you are using SQL database then you can just use JdbcTemplate, and configure manually (postgresql example with org.postgresql:postgresql:42.2.16 dependency):

        DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create();
        dataSourceBuilder.driverClassName("org.postgresql.Driver");
        dataSourceBuilder.url("jdbc:postgresql://127.0.0.1:5432/myDb");
        dataSourceBuilder.username("myUser");
        dataSourceBuilder.password("MyPassword");
        DataSource dataSource = dataSourceBuilder.build();

        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        List<Map<String, Object>> maps = jdbcTemplate.queryForList("select * from my_table");
        _... do something for your case..._

to get property values for DataSourceBuilder you can use Environment:

       Environment environment = beanFactory.getBean(Environment.class);
       String url = environment.getProperty("db-url");
       String username = environment.getProperty("db-username");
       String password = environment.getProperty("db-password");
...

tsarenkotxt avatar Sep 11 '20 22:09 tsarenkotxt