spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

Support kafka parallel-consumer

Open bojanv55 opened this issue 2 years ago • 21 comments

Expected Behavior

Enable parallel processing of the messages in single consumer.

Current Behavior

Single-threaded processing of the messages.

Context

Are there any plans to include ParallelStreamProcessor as an option for spring-kafka? https://github.com/confluentinc/parallel-consumer It should handle automatically multiple threads, acking and other stuff (synchronization on same key if needed etc.).

Currently I can configure this manually, but would be easier I guess If I would only need to implement void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction);, and the rest is taken care by the spring.

bojanv55 avatar Aug 24 '22 18:08 bojanv55

No plans currently; but contributions are welcome!

garyrussell avatar Aug 25 '22 15:08 garyrussell

That said; it's easy enough to use it. from Spring - just set up a @KafkaListener as normal; set the auto startup to "false" and then invoke the listener from the parallel consumer:

@SpringBootApplication
public class Kgh2381Application {

	private static final Logger log = LoggerFactory.getLogger(Kgh2381Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Kgh2381Application.class, args);
	}

	@KafkaListener(id = "kgh2381", topics = "kgh2381", autoStartup = "false")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("kgh2381").partitions(1).replicas(1).build();
	}

	@Bean
	ApplicationRunner runner(KafkaListenerEndpointRegistry registry, ConsumerFactory<String, String> cf,
			KafkaTemplate<String, String> template) {

		return args -> {
			MessageListener messageListener = (MessageListener) registry.getListenerContainer("kgh2381")
					.getContainerProperties().getMessageListener();
			Consumer<String, String> consumer = cf.createConsumer("group", "");
			var options = ParallelConsumerOptions.<String, String>builder()
					.ordering(ProcessingOrder.KEY)
					.consumer(consumer)
					.maxConcurrency(10)
					.build();
			ParallelStreamProcessor<String, String> processor = ParallelStreamProcessor
					.createEosStreamProcessor(options);
			processor.subscribe(List.of("kgh2381"));
			processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));
			IntStream.range(0, 10).forEach(i -> template.send("kgh2381", "foo" + i));
		};
	}

}

garyrussell avatar Aug 25 '22 18:08 garyrussell

I am trying to implement the parallel consumer in spring boot and using the above mentioned way. I have main class in separate java file and rest of the methods(KafkaListener, ApplicationRunner) in another java file.

Also, have created a ConsumerProp class with the below content:

@EnableKafka
@Configuration
public class ConsumerProp {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");

        return new DefaultKafkaConsumerFactory<>(configs);
    }
}

But the spring boot just exists without even any error. Could you please help or put a sample working example which I can leverage for further use.

dixitsingla avatar Jun 29 '23 18:06 dixitsingla

I assume you mean exits (not exists).

You must have done something wrong; the example above is a complete working example using Spring Boot. This is the output when running it...


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m              [2m (v2.7.3)[0;39m

2023-06-29 14:28:45,673 [main] Starting Kgh2381Application using Java 11.0.11 on grussell3MD6R.vmware.com with PID 83586 (/Users/grussell/Development/stsws43/kgh2381/target/classes started by grussell in /Users/grussell/Development/stsws43/kgh2381)
2023-06-29 14:28:45,675 [main] No active profile set, falling back to 1 default profile: "default"
2023-06-29 14:28:46,176 [main] AdminClientConfig values: 
...
2023-06-29 14:28:51,533 [pc-broker-poll] pl.tlinkowski.unij.service.api.collect.UnmodifiableMapFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableMapFactory (priority=40)
2023-06-29 14:28:53,520 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Successfully joined group with generation Generation{generationId=4, memberId='consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899', protocol='range'}
2023-06-29 14:28:53,522 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Finished assignment for group at generation 4: {consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899=Assignment(partitions=[kgh2381-0])}
2023-06-29 14:28:53,527 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Successfully synced group in generation Generation{generationId=4, memberId='consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899', protocol='range'}
2023-06-29 14:28:53,527 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Notifying assignor about the new Assignment(partitions=[kgh2381-0])
2023-06-29 14:28:53,529 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Adding newly assigned partitions: kgh2381-0
2023-06-29 14:28:53,529 [pc-broker-poll] Assigned 1 total (1 new) partition(s) [kgh2381-0]
2023-06-29 14:28:53,540 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Setting offset for partition kgh2381-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2023-06-29 14:28:53,559 [pc-pool-1-thread-1] foo0
2023-06-29 14:28:53,560 [pc-pool-1-thread-1] pl.tlinkowski.unij.service.api.collect.UnmodifiableListFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableListFactory (priority=40)
2023-06-29 14:28:53,562 [pc-pool-1-thread-2] foo1
2023-06-29 14:28:53,564 [pc-pool-1-thread-3] foo2
2023-06-29 14:28:53,566 [pc-pool-1-thread-4] foo3
2023-06-29 14:28:53,567 [pc-pool-1-thread-5] foo4
2023-06-29 14:28:53,569 [pc-pool-1-thread-6] foo5
2023-06-29 14:28:53,571 [pc-pool-1-thread-7] foo6
2023-06-29 14:28:53,573 [pc-pool-1-thread-8] foo7
2023-06-29 14:28:53,574 [pc-pool-1-thread-9] foo8
2023-06-29 14:28:53,576 [pc-pool-1-thread-10] foo9

garyrussell avatar Jun 29 '23 18:06 garyrussell

This is its application.properties

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

... rest of the methods(KafkaListener, ApplicationRunner) in another java file.

That file must be annotated with @Configuration so that the beans are registered.

You don't need your own consumer factory bean, just use Boot's.

garyrussell avatar Jun 29 '23 18:06 garyrussell

Thank for the correcting the type(exits).

I was able to run the application by specifying @Autowired annotation for the ApplicationRunner runner( method. Although I have tried @Configuration annotation as well and it's also working.

I am using consumer factory bean as I need to provide few properties dynamically(some cases from environment variables). Like Bootstrap server etc..

If that's okay with you I really would like you to share the sample project you just ran above.

Thanks again for all the help.

dixitsingla avatar Jun 30 '23 06:06 dixitsingla

If that's okay with you I really would like you to share the sample project you just ran above.

There's nothing more to share; that is the entire app (together with the properties above).

garyrussell avatar Jul 03 '23 14:07 garyrussell

As you stated earlier 'You don't need your own consumer factory bean, just use Boot's.'

I have just observed that the runner method is using the spring boot's consumer factory only, the one I have created is not being used. Could you please help me what should I do use the consumer factory I have created.

dixitsingla avatar Jul 03 '23 15:07 dixitsingla

Since you define your own consumer factory bean, Boot will detect it and not declare its own; yours will be injected into the runner instead.

garyrussell avatar Jul 03 '23 15:07 garyrussell

KafkaAutoConfiguration:

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(

Your @Configuration class must be in the same (or child) package as the @SpringBootApplication.

garyrussell avatar Jul 03 '23 15:07 garyrussell

As of now I am trying with the spring boot's consumer factory.

My application is stuck at 2023-07-03 22:19:38.423 INFO 40836 --- [ pc-broker-poll] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for partition topic_1-0 to position FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}. I don't see any consumed messages. there are 6 messages in the topic. Seems like the listen method is not called at all.


@KafkaListener(id = "group_1", topics = "topic_1", autoStartup = "false")
    void listen(String record) {
        log.info(record);
    }

I am not very good at spring boot yet, Sorry for asking the small doubts.

dixitsingla avatar Jul 03 '23 16:07 dixitsingla

autoStartup = "false"

You have to start the container to consume.

garyrussell avatar Jul 03 '23 16:07 garyrussell

Sorry, did not get what do you mean by container.

I am running the application in Intellij idea and the Kafka is up and running.

dixitsingla avatar Jul 03 '23 17:07 dixitsingla

Do you mean by setting the autoStartup='true' - But in that case you have said earlier that auto setup should be set to false to invoke the parallel consumer.

dixitsingla avatar Jul 03 '23 17:07 dixitsingla

Could you please help me in understanding why listen method is not called, what needs to be done to make it working.

Ideally it should call the listen method as soon as there is a message to consume as defined below. processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));

dixitsingla avatar Jul 03 '23 17:07 dixitsingla

You have to start the container to consume.

After adding the below line I was able to consume the messages. but each message is being consumed 2 times,, Could you please help me with that.

processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));
registry.getListenerContainer("group_1").start();

Could you please help me with what/where is the right way to start the container.

Output:

2023-07-03 23:47:42.584  INFO 1724 --- [  group_1-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_1-2, groupId=group_1] Cluster ID: bfy4mk8aSlW6HhYc1lbXeQ
2023-07-03 23:47:42.584  INFO 1724 --- [ pc-broker-poll] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group-1, groupId=group] Cluster ID: bfy4mk8aSlW6HhYc1lbXeQ
2023-07-03 23:47:42.586  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2023-07-03 23:47:42.586  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Discovered group coordinator 127.0.0.1:9092 (id: 2147483646 rack: null)
2023-07-03 23:47:42.590  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] (Re-)joining group
2023-07-03 23:47:42.590  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] (Re-)joining group
2023-07-03 23:47:42.606  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Request joining group due to: need to re-join with the given member-id
2023-07-03 23:47:42.606  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Request joining group due to: need to re-join with the given member-id
2023-07-03 23:47:42.606  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] (Re-)joining group
2023-07-03 23:47:42.606  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] (Re-)joining group
2023-07-03 23:47:45.612  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Successfully joined group with generation Generation{generationId=11, memberId='consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49', protocol='range'}
2023-07-03 23:47:45.617  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Finished assignment for group at generation 11: {consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49=Assignment(partitions=[topic_1-0])}
2023-07-03 23:47:45.626  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Successfully synced group in generation Generation{generationId=11, memberId='consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49', protocol='range'}
2023-07-03 23:47:45.627  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Notifying assignor about the new Assignment(partitions=[topic_1-0])
2023-07-03 23:47:45.631  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Adding newly assigned partitions: topic_1-0
2023-07-03 23:47:45.646  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Setting offset for partition topic_1-0 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}
2023-07-03 23:47:45.646  INFO 1724 --- [  group_1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group_1: partitions assigned: [topic_1-0]
2023-07-03 23:47:54.713  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : kj
2023-07-03 23:48:02.901  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : hello
2023-07-03 23:48:06.078  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : dixit
2023-07-03 23:48:09.379  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : singla
2023-07-03 23:48:12.273  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : bye
2023-07-03 23:48:14.904  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : bye
2023-07-03 23:48:16.157  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Successfully joined group with generation Generation{generationId=9, memberId='consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499', protocol='range'}
2023-07-03 23:48:16.157  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Finished assignment for group at generation 9: {consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499=Assignment(partitions=[topic_1-0])}
2023-07-03 23:48:16.162  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Successfully synced group in generation Generation{generationId=9, memberId='consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499', protocol='range'}
2023-07-03 23:48:16.163  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Notifying assignor about the new Assignment(partitions=[topic_1-0])
2023-07-03 23:48:16.165  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Adding newly assigned partitions: topic_1-0
2023-07-03 23:48:16.165  INFO 1724 --- [ pc-broker-poll] c.p.i.AbstractParallelEoSStreamProcessor : Assigned 1 total (1 new) partition(s) [topic_1-0]
2023-07-03 23:48:16.188  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Setting offset for partition topic_1-0 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}
2023-07-03 23:48:16.215  INFO 1724 --- [pool-1-thread-1] icis.kafka.consumer.ParallelConumer      : kj
2023-07-03 23:48:16.226  INFO 1724 --- [pool-1-thread-1] pl.tlinkowski.unij.api.UniJLoader        : pl.tlinkowski.unij.service.api.collect.UnmodifiableListFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableListFactory (priority=40)
2023-07-03 23:48:16.228  INFO 1724 --- [pool-1-thread-2] icis.kafka.consumer.ParallelConumer      : hello
2023-07-03 23:48:16.233  INFO 1724 --- [pool-1-thread-3] icis.kafka.consumer.ParallelConumer      : dixit
2023-07-03 23:48:16.236  INFO 1724 --- [pool-1-thread-4] icis.kafka.consumer.ParallelConumer      : singla
2023-07-03 23:48:16.238  INFO 1724 --- [pool-1-thread-5] icis.kafka.consumer.ParallelConumer      : bye
2023-07-03 23:48:16.239  INFO 1724 --- [ pc-broker-poll] pl.tlinkowski.unij.api.UniJLoader        : pl.tlinkowski.unij.service.api.collect.UnmodifiableMapFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableMapFactory (priority=40)
2023-07-03 23:48:16.239  INFO 1724 --- [pool-1-thread-6] icis.kafka.consumer.ParallelConumer      : bye
2023-07-03 23:48:31.663  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : uy
2023-07-03 23:48:31.664  INFO 1724 --- [pool-1-thread-7] icis.kafka.consumer.ParallelConumer      : uy
2023-07-03 23:48:52.102  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : 1
2023-07-03 23:48:52.103  INFO 1724 --- [pool-1-thread-8] icis.kafka.consumer.ParallelConumer      : 1
2023-07-03 23:48:53.929  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : 2
2023-07-03 23:48:53.931  INFO 1724 --- [pool-1-thread-9] icis.kafka.consumer.ParallelConumer      : 2
2023-07-03 23:48:55.887  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : 3
2023-07-03 23:48:55.888  INFO 1724 --- [ool-1-thread-10] icis.kafka.consumer.ParallelConumer      : 3
2023-07-03 23:48:57.709  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : 4
2023-07-03 23:48:57.710  INFO 1724 --- [ool-1-thread-11] icis.kafka.consumer.ParallelConumer      : 4

If you don't mind could you please send your running project example to my email id '[email protected]' for reference. Or can upload on the github so that it could help others.

dixitsingla avatar Jul 03 '23 18:07 dixitsingla

Sorry, I forgot this was about the parallel consumer.

You should not start the container.

It looks like you are seeing data.

As I said, there is nothing else to share, just copy/paste the code into a class named Kgh2381Application and the properties into application.properties (in src/main/resources).

garyrussell avatar Jul 03 '23 18:07 garyrussell

kgh2381.tgz

Previous .tgz was missing the .mvn directory.

garyrussell avatar Jul 03 '23 20:07 garyrussell

Thanks a lot for sharing the project.

@Bean @ConditionalOnMissingBean(ConsumerFactory.class) public DefaultKafkaConsumerFactory, ?> kafkaConsumerFactory(

And on the custom consumer factory (don't want to use the spring boot's config) what changes do I need to do in the ConsumerProp class.

This is the current ConsumerProp class. I just wanted to use the below DefaultKafkaConsumerFactory instead of the default spring boot's consumer factory.

@Configuration
public class ConsumerProp {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");

        return new DefaultKafkaConsumerFactory<>(configs);
    }
}

dixitsingla avatar Jul 04 '23 05:07 dixitsingla

don't want to use the spring boot's config

Why?

Just add the bean and it will replace Boot's. If it's in a separate file (ConsumerProp); it must be in the same package hierarchy as the @SpringBootApplication.

garyrussell avatar Jul 04 '23 13:07 garyrussell

Hi, @garyrussell I saw your comment No plans currently; but contributions are welcome!, this make me motivated! For this feature, i create PR with skeleton code. When you have free time, could you take a look please? (PR : https://github.com/spring-projects/spring-kafka/pull/3161)

chickenchickenlove avatar Mar 24 '24 12:03 chickenchickenlove