spring-kafka
spring-kafka copied to clipboard
Support kafka parallel-consumer
Expected Behavior
Enable parallel processing of the messages in single consumer.
Current Behavior
Single-threaded processing of the messages.
Context
Are there any plans to include ParallelStreamProcessor
as an option for spring-kafka? https://github.com/confluentinc/parallel-consumer It should handle automatically multiple threads, acking and other stuff (synchronization on same key if needed etc.).
Currently I can configure this manually, but would be easier I guess If I would only need to implement void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction);
, and the rest is taken care by the spring.
No plans currently; but contributions are welcome!
That said; it's easy enough to use it. from Spring - just set up a @KafkaListener
as normal; set the auto startup to "false" and then invoke the listener from the parallel consumer:
@SpringBootApplication
public class Kgh2381Application {
private static final Logger log = LoggerFactory.getLogger(Kgh2381Application.class);
public static void main(String[] args) {
SpringApplication.run(Kgh2381Application.class, args);
}
@KafkaListener(id = "kgh2381", topics = "kgh2381", autoStartup = "false")
void listen(String in) {
log.info(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("kgh2381").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry registry, ConsumerFactory<String, String> cf,
KafkaTemplate<String, String> template) {
return args -> {
MessageListener messageListener = (MessageListener) registry.getListenerContainer("kgh2381")
.getContainerProperties().getMessageListener();
Consumer<String, String> consumer = cf.createConsumer("group", "");
var options = ParallelConsumerOptions.<String, String>builder()
.ordering(ProcessingOrder.KEY)
.consumer(consumer)
.maxConcurrency(10)
.build();
ParallelStreamProcessor<String, String> processor = ParallelStreamProcessor
.createEosStreamProcessor(options);
processor.subscribe(List.of("kgh2381"));
processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));
IntStream.range(0, 10).forEach(i -> template.send("kgh2381", "foo" + i));
};
}
}
I am trying to implement the parallel consumer in spring boot and using the above mentioned way. I have main class in separate java file and rest of the methods(KafkaListener, ApplicationRunner) in another java file.
Also, have created a ConsumerProp class with the below content:
@EnableKafka
@Configuration
public class ConsumerProp {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
return new DefaultKafkaConsumerFactory<>(configs);
}
}
But the spring boot just exists without even any error. Could you please help or put a sample working example which I can leverage for further use.
I assume you mean exits (not exists).
You must have done something wrong; the example above is a complete working example using Spring Boot. This is the output when running it...
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m [2m (v2.7.3)[0;39m
2023-06-29 14:28:45,673 [main] Starting Kgh2381Application using Java 11.0.11 on grussell3MD6R.vmware.com with PID 83586 (/Users/grussell/Development/stsws43/kgh2381/target/classes started by grussell in /Users/grussell/Development/stsws43/kgh2381)
2023-06-29 14:28:45,675 [main] No active profile set, falling back to 1 default profile: "default"
2023-06-29 14:28:46,176 [main] AdminClientConfig values:
...
2023-06-29 14:28:51,533 [pc-broker-poll] pl.tlinkowski.unij.service.api.collect.UnmodifiableMapFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableMapFactory (priority=40)
2023-06-29 14:28:53,520 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Successfully joined group with generation Generation{generationId=4, memberId='consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899', protocol='range'}
2023-06-29 14:28:53,522 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Finished assignment for group at generation 4: {consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899=Assignment(partitions=[kgh2381-0])}
2023-06-29 14:28:53,527 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Successfully synced group in generation Generation{generationId=4, memberId='consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899', protocol='range'}
2023-06-29 14:28:53,527 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Notifying assignor about the new Assignment(partitions=[kgh2381-0])
2023-06-29 14:28:53,529 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Adding newly assigned partitions: kgh2381-0
2023-06-29 14:28:53,529 [pc-broker-poll] Assigned 1 total (1 new) partition(s) [kgh2381-0]
2023-06-29 14:28:53,540 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Setting offset for partition kgh2381-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2023-06-29 14:28:53,559 [pc-pool-1-thread-1] foo0
2023-06-29 14:28:53,560 [pc-pool-1-thread-1] pl.tlinkowski.unij.service.api.collect.UnmodifiableListFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableListFactory (priority=40)
2023-06-29 14:28:53,562 [pc-pool-1-thread-2] foo1
2023-06-29 14:28:53,564 [pc-pool-1-thread-3] foo2
2023-06-29 14:28:53,566 [pc-pool-1-thread-4] foo3
2023-06-29 14:28:53,567 [pc-pool-1-thread-5] foo4
2023-06-29 14:28:53,569 [pc-pool-1-thread-6] foo5
2023-06-29 14:28:53,571 [pc-pool-1-thread-7] foo6
2023-06-29 14:28:53,573 [pc-pool-1-thread-8] foo7
2023-06-29 14:28:53,574 [pc-pool-1-thread-9] foo8
2023-06-29 14:28:53,576 [pc-pool-1-thread-10] foo9
This is its application.properties
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
... rest of the methods(KafkaListener, ApplicationRunner) in another java file.
That file must be annotated with @Configuration
so that the beans are registered.
You don't need your own consumer factory bean, just use Boot's.
Thank for the correcting the type(exits).
I was able to run the application by specifying @Autowired
annotation for the ApplicationRunner runner(
method. Although I have tried @Configuration
annotation as well and it's also working.
I am using consumer factory bean as I need to provide few properties dynamically(some cases from environment variables). Like Bootstrap server etc..
If that's okay with you I really would like you to share the sample project you just ran above.
Thanks again for all the help.
If that's okay with you I really would like you to share the sample project you just ran above.
There's nothing more to share; that is the entire app (together with the properties above).
As you stated earlier 'You don't need your own consumer factory bean, just use Boot's.'
I have just observed that the runner method is using the spring boot's consumer factory only, the one I have created is not being used. Could you please help me what should I do use the consumer factory I have created.
Since you define your own consumer factory bean, Boot will detect it and not declare its own; yours will be injected into the runner instead.
KafkaAutoConfiguration
:
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(
Your @Configuration
class must be in the same (or child) package as the @SpringBootApplication
.
As of now I am trying with the spring boot's consumer factory.
My application is stuck at
2023-07-03 22:19:38.423 INFO 40836 --- [ pc-broker-poll] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for partition topic_1-0 to position FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}.
I don't see any consumed messages. there are 6 messages in the topic. Seems like the listen method is not called at all.
@KafkaListener(id = "group_1", topics = "topic_1", autoStartup = "false")
void listen(String record) {
log.info(record);
}
I am not very good at spring boot yet, Sorry for asking the small doubts.
autoStartup = "false"
You have to start the container to consume.
Sorry, did not get what do you mean by container.
I am running the application in Intellij idea and the Kafka is up and running.
Do you mean by setting the autoStartup='true'
- But in that case you have said earlier that auto setup should be set to false to invoke the parallel consumer.
Could you please help me in understanding why listen
method is not called, what needs to be done to make it working.
Ideally it should call the listen method as soon as there is a message to consume as defined below.
processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));
You have to start the container to consume.
After adding the below line I was able to consume the messages. but each message is being consumed 2 times,, Could you please help me with that.
processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));
registry.getListenerContainer("group_1").start();
Could you please help me with what/where is the right way to start the container.
Output:
2023-07-03 23:47:42.584 INFO 1724 --- [ group_1-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group_1-2, groupId=group_1] Cluster ID: bfy4mk8aSlW6HhYc1lbXeQ
2023-07-03 23:47:42.584 INFO 1724 --- [ pc-broker-poll] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group-1, groupId=group] Cluster ID: bfy4mk8aSlW6HhYc1lbXeQ
2023-07-03 23:47:42.586 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2023-07-03 23:47:42.586 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2023-07-03 23:47:42.590 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] (Re-)joining group
2023-07-03 23:47:42.590 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] (Re-)joining group
2023-07-03 23:47:42.606 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] Request joining group due to: need to re-join with the given member-id
2023-07-03 23:47:42.606 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Request joining group due to: need to re-join with the given member-id
2023-07-03 23:47:42.606 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] (Re-)joining group
2023-07-03 23:47:42.606 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] (Re-)joining group
2023-07-03 23:47:45.612 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] Successfully joined group with generation Generation{generationId=11, memberId='consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49', protocol='range'}
2023-07-03 23:47:45.617 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] Finished assignment for group at generation 11: {consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49=Assignment(partitions=[topic_1-0])}
2023-07-03 23:47:45.626 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] Successfully synced group in generation Generation{generationId=11, memberId='consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49', protocol='range'}
2023-07-03 23:47:45.627 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] Notifying assignor about the new Assignment(partitions=[topic_1-0])
2023-07-03 23:47:45.631 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] Adding newly assigned partitions: topic_1-0
2023-07-03 23:47:45.646 INFO 1724 --- [ group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_1-2, groupId=group_1] Setting offset for partition topic_1-0 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}
2023-07-03 23:47:45.646 INFO 1724 --- [ group_1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group_1: partitions assigned: [topic_1-0]
2023-07-03 23:47:54.713 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : kj
2023-07-03 23:48:02.901 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : hello
2023-07-03 23:48:06.078 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : dixit
2023-07-03 23:48:09.379 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : singla
2023-07-03 23:48:12.273 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : bye
2023-07-03 23:48:14.904 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : bye
2023-07-03 23:48:16.157 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Successfully joined group with generation Generation{generationId=9, memberId='consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499', protocol='range'}
2023-07-03 23:48:16.157 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Finished assignment for group at generation 9: {consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499=Assignment(partitions=[topic_1-0])}
2023-07-03 23:48:16.162 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Successfully synced group in generation Generation{generationId=9, memberId='consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499', protocol='range'}
2023-07-03 23:48:16.163 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Notifying assignor about the new Assignment(partitions=[topic_1-0])
2023-07-03 23:48:16.165 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Adding newly assigned partitions: topic_1-0
2023-07-03 23:48:16.165 INFO 1724 --- [ pc-broker-poll] c.p.i.AbstractParallelEoSStreamProcessor : Assigned 1 total (1 new) partition(s) [topic_1-0]
2023-07-03 23:48:16.188 INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-1, groupId=group] Setting offset for partition topic_1-0 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}
2023-07-03 23:48:16.215 INFO 1724 --- [pool-1-thread-1] icis.kafka.consumer.ParallelConumer : kj
2023-07-03 23:48:16.226 INFO 1724 --- [pool-1-thread-1] pl.tlinkowski.unij.api.UniJLoader : pl.tlinkowski.unij.service.api.collect.UnmodifiableListFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableListFactory (priority=40)
2023-07-03 23:48:16.228 INFO 1724 --- [pool-1-thread-2] icis.kafka.consumer.ParallelConumer : hello
2023-07-03 23:48:16.233 INFO 1724 --- [pool-1-thread-3] icis.kafka.consumer.ParallelConumer : dixit
2023-07-03 23:48:16.236 INFO 1724 --- [pool-1-thread-4] icis.kafka.consumer.ParallelConumer : singla
2023-07-03 23:48:16.238 INFO 1724 --- [pool-1-thread-5] icis.kafka.consumer.ParallelConumer : bye
2023-07-03 23:48:16.239 INFO 1724 --- [ pc-broker-poll] pl.tlinkowski.unij.api.UniJLoader : pl.tlinkowski.unij.service.api.collect.UnmodifiableMapFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableMapFactory (priority=40)
2023-07-03 23:48:16.239 INFO 1724 --- [pool-1-thread-6] icis.kafka.consumer.ParallelConumer : bye
2023-07-03 23:48:31.663 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : uy
2023-07-03 23:48:31.664 INFO 1724 --- [pool-1-thread-7] icis.kafka.consumer.ParallelConumer : uy
2023-07-03 23:48:52.102 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : 1
2023-07-03 23:48:52.103 INFO 1724 --- [pool-1-thread-8] icis.kafka.consumer.ParallelConumer : 1
2023-07-03 23:48:53.929 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : 2
2023-07-03 23:48:53.931 INFO 1724 --- [pool-1-thread-9] icis.kafka.consumer.ParallelConumer : 2
2023-07-03 23:48:55.887 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : 3
2023-07-03 23:48:55.888 INFO 1724 --- [ool-1-thread-10] icis.kafka.consumer.ParallelConumer : 3
2023-07-03 23:48:57.709 INFO 1724 --- [ group_1-0-C-1] icis.kafka.consumer.ParallelConumer : 4
2023-07-03 23:48:57.710 INFO 1724 --- [ool-1-thread-11] icis.kafka.consumer.ParallelConumer : 4
If you don't mind could you please send your running project example to my email id '[email protected]' for reference. Or can upload on the github so that it could help others.
Sorry, I forgot this was about the parallel consumer.
You should not start the container.
It looks like you are seeing data.
As I said, there is nothing else to share, just copy/paste the code into a class named Kgh2381Application
and the properties into application.properties
(in src/main/resources).
Thanks a lot for sharing the project.
@Bean @ConditionalOnMissingBean(ConsumerFactory.class) public DefaultKafkaConsumerFactory, ?> kafkaConsumerFactory(
And on the custom consumer factory (don't want to use the spring boot's config) what changes do I need to do in the ConsumerProp class.
This is the current ConsumerProp class. I just wanted to use the below DefaultKafkaConsumerFactory instead of the default spring boot's consumer factory.
@Configuration
public class ConsumerProp {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
return new DefaultKafkaConsumerFactory<>(configs);
}
}
don't want to use the spring boot's config
Why?
Just add the bean and it will replace Boot's. If it's in a separate file (ConsumerProp
); it must be in the same package hierarchy as the @SpringBootApplication
.
Hi, @garyrussell
I saw your comment No plans currently; but contributions are welcome!
, this make me motivated!
For this feature, i create PR with skeleton code.
When you have free time, could you take a look please?
(PR : https://github.com/spring-projects/spring-kafka/pull/3161)