kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Round robin balancer is not balancing topic-aware

Open CubicrootXYZ opened this issue 1 year ago • 0 comments

Describe the bug

The round robin balancer seems to not be balancing on a per topic basis and instead balances between partitions across all topics messages are enqueued for, ignoring different topics.

E.g. the partition is increased by 1 for each new message, no matter if it is send to the same topic as the previous one. In case of 4 messages written onto topic1, topic2, topic1, topic2 the partitions are chosen as follows:

  1. Topic 1, Partition 0
  2. Topic 2, Partition 1
  3. Topic 1, Partition 2
  4. Topic 2, Partition 3

Resulting in half of the partitions on each topic not receiving any messages. Instead I'd expect the following behavior:

  1. Topic 1, Partition 0
  2. Topic 2, Partition 0
  3. Topic 1, Partition 1
  4. Topic 2, Partition 1

Kafka Version

github.com/segmentio/kafka-go v0.4.47

Kafka version is unknown.

To Reproduce

A minimal working example:

        writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{kafkaAddr},
		Balancer: &kafka.RoundRobin{},
	})

	err := writer.WriteMessages(
		context.Background(),
		kafka.Message{
			Topic: topic1,
			Value: []byte("Topic 1, Message 1"),
		},
		kafka.Message{
			Topic: topic2,
			Value: []byte("Topic 2, Message 2"),
		},
		kafka.Message{
			Topic: topic1,
			Value: []byte("Topic 1, Message 3"),
		},
		kafka.Message{
			Topic: topic2,
			Value: []byte("Topic 2, Message 4"),
		},
	)
	require.NoError(t, err, "could not send kafka messages")

	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:     []string{kafkaAddr},
		GroupTopics: []string{topic1, topic2},
		GroupID:     "test",
	})

	partitionCount := map[string]map[int]int{}
	for range 4 {
		msg, err := reader.FetchMessage(context.Background())
		require.NoError(t, err, "could not read message")

		fmt.Printf("%s / Partition %d\n", string(msg.Value), msg.Partition)

		if partitionCount[msg.Topic] == nil {
			partitionCount[msg.Topic] = map[int]int{}
		}

		partitionCount[msg.Topic][msg.Partition]++
	}

	assert.Equal(t, 1, partitionCount[topic1][0], "expect 1 message for each partition")
	assert.Equal(t, 1, partitionCount[topic1][1], "expect 1 message for each partition")
	assert.Equal(t, 1, partitionCount[topic2][0], "expect 1 message for each partition")
	assert.Equal(t, 1, partitionCount[topic2][1], "expect 1 message for each partition")

Expected Behavior

I'd expect for both topics 1 message on partition 0 and 1.

Observed Behavior

There are 2 messages on topic 1, partition 0 and 2 messages on topic 2 partition 1.

Additional Context

CubicrootXYZ avatar Jun 18 '24 12:06 CubicrootXYZ