kafka-go
kafka-go copied to clipboard
Fix: #1314 partition watcher doesn't reacts on partition number changing
Suggestion to fix issue https://github.com/segmentio/kafka-go/issues/1314
Steps to reproduce the issue:
- Create topic
- Simultaneously connect to this topic with consumer group
- At some time, if Kafka didn't assigned partitions to topic (so topic is exists, but has zero partitions), you won't receive topic to consumer group because of empty assignment. So Kafka after that assigning partitions to topic
- At the start of partitionWatcher it scrapes assigned number of partitions and starts loop where monitors number of partitions assigned to topic
After all we have empty consumer group, and partitionWatcher monitoring for changing partition number with non-zero start number of partitions. So we have to increase partitions to receive topic into consumer group.
Solution: We need to assign partitions number to topic in assignTopicPartitions function. So we run partitionWatcher with assigned in assignTopicPartition function number of partitions and don't scrape it inside partitionWatcher
P.S. In this PR I also changed partitionWatcher. Now it runs only if consumer is leader of consumer group. Kafka says this is correct
@erikdw @petedannemann Sorry for pinging, but I noticed that you reviewed the last few PRs that were merged. Could you please consider my proposal to improve the work of the consumer group?
@petedannemann I did everything as you said, could you please take a look?
Thanks)
any updates on this? kind of crucial fix for us when we read from a non-existent topic
any updates on this? kind of crucial fix for us when we read from a non-existent topic
need to review from @petedannemann or somebody else code is ready, i think