reactor-kafka
reactor-kafka copied to clipboard
Can you add spring boot + reactive kafka tutorials?
Have you looked at the samples ?
@javaHelper there is an example of integration with Spring Boot here https://github.com/Kevded/example-reactive-spring-kafka-consumer-and-producer
ReactiveKafkaProducerTemplate & ReactiveKafkaConsumerTemplate.
Hi @garyrussell , all, just out of curiosity I have a question, do you see any issue if we use spring-kafka without reactor-kafka on webflux (netty). I tried to run it, it and was looking fine , just creates its own producer and consumer threads. I know it will be blocking , but is there any other drawback apart from this.
@Aniket-Singla Producer should be fine; I am not sure what you mean on the consumer side; please clarify.
Hi Gary, on Consumer side too I am following the native approach (spring-kafka KafkaListener annotation )
@KafkaListener(topics = "topic-name", groupId = "group-id", containerFactory="kafkaListenerContainerFactory")
public void consume(org.springframework.messaging.Message<Message<?>> message){
log.info("message received is "+ message.getPayload());
}
}
For creating consumer factory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message>
kafkaListenerContainerFactoryForJson() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryForJsonMessage());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<String, Message> consumerFactoryForJsonMessage() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, orderStartOffset);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}
I can confirm that messages are getting consumed on webflux (netty) as well. I was wondering if there can be any issue if I do not migrate to reactor-kafka.
Thanks.
I can confirm that messages are getting consumed on webflux (netty) as well.
It is still not clear; sorry; the @KafkaListener
has no interaction with netty, it uses its own task executor to create threads.
Yes, that is true, "it creates its own task executor to create threads".
By saying :
I can confirm that messages are getting consumed on webflux (netty) as well.
I meant my spring boot application is running on netty , but spring-kafka is creating its own executor. I just wanted to make sure that going with @KafkaListner threads in webflux is limited to the below issue:
- Blocking Reactor's Threads in @KafkaListener threads whenever service calls are made ( Not recommended to block Reactor netty's eventloop thread, but for the time being I will be blocking them in the listener until I move to reactor-kafka ).
Thanks
But, the @KafkaListener
threads are NOT reactor netty threads.
But, the
@KafkaListener
threads are NOT reactor netty threads.
I already got your point thanks.
By saying
Blocking Reactor's Threads in @KafkaListener threads whenever service calls are made ( Not recommended to block Reactor netty's eventloop thread, but for the time being I will be blocking them in the listener until I move to reactor-kafka ).
I mean, @KafkaListener's threads will be blocking netty's threads because my @KafkaListener's threads will internally call logic that runs on Reactor's threads. I think I am clear now.
Thanks for helping till here.
@garyrussell Can you please confirm whether the last comment made by @Aniket-Singla is correct or not?
I don't know - I don't understand what (s)he's trying to say. If the task is handed off to a netty thread spring-kafka is out of the picture.
Waiting for this
https://github.com/spring-projects/spring-boot/issues/29080 is related to here.
spring webflux + reactive Kafka checkout this article.