kafka-go
kafka-go copied to clipboard
Rebalancing the consumer via WatchPartitionChanges doesn't seem to work
Describe the bug
Rebalancing the consumer via WatchPartitionChanges doesn't seem to work as expected. By increasing the number of partitions the consumers in the group do not rebalancing each others to reassign the new partition to one of them. Only a restart of the consumers solve the issue. (workaround)
Kafka Version
- What version(s) of Kafka are you testing against? 3.2.1 (Commit:b172a0a94f4ebb9f) 3.1.0 (Commit:37edeed0777bacb3)
- What version of kafka-go are you using? github.com/segmentio/kafka-go v0.4.40 github.com/segmentio/kafka-go v0.4.38
To Reproduce
Resources to reproduce the behavior:
- create a topic with one partition "search-events"
- publish a message on it
- start the kafka-go consumer as described in the main
- alter the partitions number to 2 for the topic "search-events"
---
# docker-compose.yaml
version: '3.7'
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
restart: always
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
restart: always
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
...
package main
import (
"github.com/segmentio/kafka-go"
)
func main() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
Topic: "search-events",
GroupID: "search-service",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
WatchPartitionChanges: true,
PartitionWatchInterval: 10 * time.Second,
})
defer reader.Close()
for {
m, err := reader.FetchMessage(context.Background())
if err != nil {
log.Fatal(err)
}
log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
}
Expected Behavior
See a rebalancing on the consumer that take both the partitions
Observed Behavior
The consumer remain connected only to the partition 0, the new partition 1 is not read by any consumers in the group
I'm not able to reproduce any issue with the steps provided. When I modify the partitions to 2 I see the reader pick up the new partition and begin consuming.
I encountered the same issue. in my case, i have multiple topics in one group, and each topic creates a consumergroup instance, but using the same group name. and when i altered partition number of one topic, then partitions of this topic didn't been rebalanced or been synced. for example, at the beginning, the partition number of topic1 is 2, and then changed to 4, but after this change, in this group, we still have 2 partitions (0,1).
and i checked some logs of the lib. then found maybe it is due to following condition isn't true. so i think the changed topic didn't been selected a leader, hence only change partition of this topic, the lib will not trigger assignTopicPartitions. so finally, the partitions of this topic still are same with previous. i don't know if this is a bug or we can't do this way in this lib.
if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
v, err := cg.assignTopicPartitions(conn, response)
if err != nil {
return memberID, 0, nil, err
}
assignments = v
cg.withLogger(func(l Logger) {
for memberID, assignment := range assignments {
for topic, partitions := range assignment {
l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
}
}
})
}
Same issue :(