kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Rebalancing the consumer via WatchPartitionChanges doesn't seem to work

Open simone opened this issue 2 years ago • 4 comments

Describe the bug

Rebalancing the consumer via WatchPartitionChanges doesn't seem to work as expected. By increasing the number of partitions the consumers in the group do not rebalancing each others to reassign the new partition to one of them. Only a restart of the consumers solve the issue. (workaround)

Kafka Version

  • What version(s) of Kafka are you testing against? 3.2.1 (Commit:b172a0a94f4ebb9f) 3.1.0 (Commit:37edeed0777bacb3)
  • What version of kafka-go are you using? github.com/segmentio/kafka-go v0.4.40 github.com/segmentio/kafka-go v0.4.38

To Reproduce

Resources to reproduce the behavior:

  • create a topic with one partition "search-events"
  • publish a message on it
  • start the kafka-go consumer as described in the main
  • alter the partitions number to 2 for the topic "search-events"
---
# docker-compose.yaml
version: '3.7'

services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    restart: always
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    image: 'bitnami/kafka:latest'
    restart: always
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
...
package main

import (
    "github.com/segmentio/kafka-go"
)

func main() {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:                []string{"kafka:9092"},
		Topic:                  "search-events",
		GroupID:                "search-service",
		MinBytes:               10e3, // 10KB
		MaxBytes:               10e6, // 10MB
		WatchPartitionChanges:  true,
		PartitionWatchInterval: 10 * time.Second,
	})
	defer reader.Close()
	for {
		m, err := reader.FetchMessage(context.Background())
		if err != nil {
			log.Fatal(err)
		}
		log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
	}
}

Expected Behavior

See a rebalancing on the consumer that take both the partitions

Observed Behavior

The consumer remain connected only to the partition 0, the new partition 1 is not read by any consumers in the group

simone avatar May 22 '23 17:05 simone

I'm not able to reproduce any issue with the steps provided. When I modify the partitions to 2 I see the reader pick up the new partition and begin consuming.

rhansen2 avatar Jul 08 '23 19:07 rhansen2

I encountered the same issue. in my case, i have multiple topics in one group, and each topic creates a consumergroup instance, but using the same group name. and when i altered partition number of one topic, then partitions of this topic didn't been rebalanced or been synced. for example, at the beginning, the partition number of topic1 is 2, and then changed to 4, but after this change, in this group, we still have 2 partitions (0,1).

and i checked some logs of the lib. then found maybe it is due to following condition isn't true. so i think the changed topic didn't been selected a leader, hence only change partition of this topic, the lib will not trigger assignTopicPartitions. so finally, the partitions of this topic still are same with previous. i don't know if this is a bug or we can't do this way in this lib.

if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
		v, err := cg.assignTopicPartitions(conn, response)
		if err != nil {
			return memberID, 0, nil, err
		}
		assignments = v

		cg.withLogger(func(l Logger) {
			for memberID, assignment := range assignments {
				for topic, partitions := range assignment {
					l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
				}
			}
		})
	}

luffyao avatar Jul 28 '23 08:07 luffyao

Same issue :(

Abdulsametileri avatar Feb 02 '24 15:02 Abdulsametileri