reactor-kafka icon indicating copy to clipboard operation
reactor-kafka copied to clipboard

Can you add spring boot + reactive kafka tutorials?

Open javaHelper opened this issue 3 years ago • 14 comments

javaHelper avatar May 26 '21 16:05 javaHelper

Have you looked at the samples ?

garyrussell avatar May 26 '21 19:05 garyrussell

@javaHelper there is an example of integration with Spring Boot here https://github.com/Kevded/example-reactive-spring-kafka-consumer-and-producer

ReactiveKafkaProducerTemplate & ReactiveKafkaConsumerTemplate.

Kevded avatar Jun 28 '21 13:06 Kevded

Hi @garyrussell , all, just out of curiosity I have a question, do you see any issue if we use spring-kafka without reactor-kafka on webflux (netty). I tried to run it, it and was looking fine , just creates its own producer and consumer threads. I know it will be blocking , but is there any other drawback apart from this.

Aniket-Singla avatar Sep 22 '21 13:09 Aniket-Singla

@Aniket-Singla Producer should be fine; I am not sure what you mean on the consumer side; please clarify.

garyrussell avatar Sep 23 '21 17:09 garyrussell

Hi Gary, on Consumer side too I am following the native approach (spring-kafka KafkaListener annotation )

@KafkaListener(topics = "topic-name", groupId = "group-id", containerFactory="kafkaListenerContainerFactory")
    public void consume(org.springframework.messaging.Message<Message<?>> message){
        log.info("message  received is "+ message.getPayload());
        }
    }

For creating consumer factory:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message>
    kafkaListenerContainerFactoryForJson() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryForJsonMessage());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

@Bean
    public ConsumerFactory<String, Message> consumerFactoryForJsonMessage() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, orderStartOffset);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                new JsonDeserializer<>(Message.class));
    }

I can confirm that messages are getting consumed on webflux (netty) as well. I was wondering if there can be any issue if I do not migrate to reactor-kafka.

Thanks.

Aniket-Singla avatar Sep 24 '21 04:09 Aniket-Singla

I can confirm that messages are getting consumed on webflux (netty) as well.

It is still not clear; sorry; the @KafkaListener has no interaction with netty, it uses its own task executor to create threads.

garyrussell avatar Sep 24 '21 13:09 garyrussell

Yes, that is true, "it creates its own task executor to create threads".

By saying :

I can confirm that messages are getting consumed on webflux (netty) as well.

I meant my spring boot application is running on netty , but spring-kafka is creating its own executor. I just wanted to make sure that going with @KafkaListner threads in webflux is limited to the below issue:

  • Blocking Reactor's Threads in @KafkaListener threads whenever service calls are made ( Not recommended to block Reactor netty's eventloop thread, but for the time being I will be blocking them in the listener until I move to reactor-kafka ).

Thanks

Aniket-Singla avatar Sep 24 '21 14:09 Aniket-Singla

But, the @KafkaListener threads are NOT reactor netty threads.

garyrussell avatar Sep 24 '21 15:09 garyrussell

But, the @KafkaListener threads are NOT reactor netty threads.

I already got your point thanks.

By saying

Blocking Reactor's Threads in @KafkaListener threads whenever service calls are made ( Not recommended to block Reactor netty's eventloop thread, but for the time being I will be blocking them in the listener until I move to reactor-kafka ).

I mean, @KafkaListener's threads will be blocking netty's threads because my @KafkaListener's threads will internally call logic that runs on Reactor's threads. I think I am clear now.

Thanks for helping till here.

Aniket-Singla avatar Sep 24 '21 16:09 Aniket-Singla

@garyrussell Can you please confirm whether the last comment made by @Aniket-Singla is correct or not?

mohit24 avatar Dec 14 '21 18:12 mohit24

I don't know - I don't understand what (s)he's trying to say. If the task is handed off to a netty thread spring-kafka is out of the picture.

garyrussell avatar Dec 14 '21 19:12 garyrussell

Waiting for this

lynch19 avatar Dec 18 '21 16:12 lynch19

https://github.com/spring-projects/spring-boot/issues/29080 is related to here.

roger751 avatar Dec 18 '21 17:12 roger751

spring webflux + reactive Kafka checkout this article.

uday-utronics avatar Jun 02 '23 08:06 uday-utronics