kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Unknown Member ID: the member id is not in the current generation

Open gustavooferreira opened this issue 3 years ago • 8 comments

Describe the bug We've hit an issue in production where we get a lot of unknown member ID errors.

Kafka Version I'm using confluent's docker image: confluentinc/cp-kafka:6.2.1

To Reproduce I've managed to create a simplified version that shows this error.

The setup is, one topic called test-topic with just one partition, and using the following golang code:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	fmt.Printf("Starting program [%s]\n", time.Now().Format(time.RFC3339Nano))

	clientID := os.Args[1]

	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
		ClientID:  clientID,
	}

	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:           []string{"localhost:9092"},
		Topic:             "test-topic",
		GroupID:           "test-consumer-group",
		Dialer:            dialer,
		MinBytes:          1,    // 1B
		MaxBytes:          10e6, // 10MB
		MaxWait:           1 * time.Minute,
		StartOffset:       kafka.FirstOffset,
		Logger:            Logger{"--- INFO ---"},
		ErrorLogger:       Logger{"-- ERROR ---"},
		HeartbeatInterval: 30 * time.Second,
		SessionTimeout:    95 * time.Second,
	})

	for {
		fmt.Printf("----- Reading message [%s]\n", time.Now().Format(time.RFC3339Nano))
		m, err := r.FetchMessage(context.Background())
		fmt.Printf("----- Message read [%s]\n", time.Now().Format(time.RFC3339Nano))
		if err != nil {
			fmt.Printf("error: %s\n", err)
			break
		}
		fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))

		fmt.Printf("----- simulate task [%s]\n", time.Now().Format(time.RFC3339Nano))
		time.Sleep(10 * time.Second)

		fmt.Printf("----- committing offset [%s]\n", time.Now().Format(time.RFC3339Nano))
		err = r.CommitMessages(context.Background(), m)
		fmt.Printf("----- commit offset sent [%s]\n", time.Now().Format(time.RFC3339Nano))
		if err != nil {
			fmt.Printf("error: %s\n", err)
			break
		}
	}

	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

type Logger struct {
	Prefix string
}

func (l Logger) Printf(msg string, args ...interface{}) {
	msg = fmt.Sprintf("%s [%s] %s\n", l.Prefix, time.Now().Format(time.RFC3339Nano), msg)
	fmt.Printf(msg, args...)
}

Then, in one shell, run:

go run main.go clientA

Wait a few seconds and run on a different shell:

go run main.go clientB

You will notice that they will both get errors in perpetuity complaining about member ID unknown.

The logs for clientA:

Starting program [2021-11-08T12:05:37.177572Z]
----- Reading message [2021-11-08T12:05:37.177782Z]
--- INFO --- [2021-11-08T12:05:37.177792Z] entering loop for consumer group, test-consumer-group

--- INFO --- [2021-11-08T12:05:37.19175Z] joined group test-consumer-group as member clientA-3aac949e-836d-44fe-8275-7c9d429af88e in generation 1
--- INFO --- [2021-11-08T12:05:37.191766Z] selected as leader for group, test-consumer-group

--- INFO --- [2021-11-08T12:05:37.193769Z] using 'range' balancer to assign group, test-consumer-group
--- INFO --- [2021-11-08T12:05:37.193782Z] found member: clientA-3aac949e-836d-44fe-8275-7c9d429af88e/[]byte(nil)
--- INFO --- [2021-11-08T12:05:37.193786Z] found topic/partition: test-topic/0
--- INFO --- [2021-11-08T12:05:37.193805Z] assigned member/topic/partitions clientA-3aac949e-836d-44fe-8275-7c9d429af88e/test-topic/[0]
--- INFO --- [2021-11-08T12:05:37.193812Z] joinGroup succeeded for response, test-consumer-group.  generationID=1, memberID=clientA-3aac949e-836d-44fe-8275-7c9d429af88e
--- INFO --- [2021-11-08T12:05:37.193815Z] Joined group test-consumer-group as member clientA-3aac949e-836d-44fe-8275-7c9d429af88e in generation 1
--- INFO --- [2021-11-08T12:05:37.193821Z] Syncing 1 assignments for generation 1 as member clientA-3aac949e-836d-44fe-8275-7c9d429af88e
--- INFO --- [2021-11-08T12:05:37.196984Z] sync group finished for group, test-consumer-group
--- INFO --- [2021-11-08T12:05:37.198517Z] subscribed to topics and partitions: map[{topic:test-topic partition:0}:-2]
--- INFO --- [2021-11-08T12:05:37.198591Z] started commit for group test-consumer-group

--- INFO --- [2021-11-08T12:05:37.1986Z] initializing kafka reader for partition 0 of test-topic starting at offset -2
--- INFO --- [2021-11-08T12:05:37.198527Z] started heartbeat for group, test-consumer-group [30s]
--- INFO --- [2021-11-08T12:05:37.209351Z] the kafka reader for partition 0 of test-topic is seeking to offset 0
--- INFO --- [2021-11-08T12:06:07.201882Z] stopped heartbeat for group test-consumer-group

--- INFO --- [2021-11-08T12:06:07.201935Z] stopped commit for group test-consumer-group

--- INFO --- [2021-11-08T12:06:36.148555Z] no messages received from kafka within the allocated time for partition 0 of test-topic at offset 0
-- ERROR --- [2021-11-08T12:06:36.155666Z] Failed to join group test-consumer-group: [25] Unknown Member ID: the member id is not in the current generation
-- ERROR --- [2021-11-08T12:06:36.155738Z] [25] Unknown Member ID: the member id is not in the current generation
--- INFO --- [2021-11-08T12:07:11.132153Z] joined group test-consumer-group as member clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7 in generation 3
--- INFO --- [2021-11-08T12:07:11.132178Z] selected as leader for group, test-consumer-group

--- INFO --- [2021-11-08T12:07:11.133948Z] using 'range' balancer to assign group, test-consumer-group
--- INFO --- [2021-11-08T12:07:11.13396Z] found member: clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7/[]byte(nil)
--- INFO --- [2021-11-08T12:07:11.133964Z] found topic/partition: test-topic/0
--- INFO --- [2021-11-08T12:07:11.133982Z] assigned member/topic/partitions clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7/test-topic/[0]
--- INFO --- [2021-11-08T12:07:11.133987Z] joinGroup succeeded for response, test-consumer-group.  generationID=3, memberID=clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7
--- INFO --- [2021-11-08T12:07:11.133995Z] Joined group test-consumer-group as member clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7 in generation 3
--- INFO --- [2021-11-08T12:07:11.134Z] Syncing 1 assignments for generation 3 as member clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7
--- INFO --- [2021-11-08T12:07:11.136683Z] sync group finished for group, test-consumer-group
--- INFO --- [2021-11-08T12:07:11.1382Z] subscribed to topics and partitions: map[{topic:test-topic partition:0}:-2]
--- INFO --- [2021-11-08T12:07:11.138214Z] started heartbeat for group, test-consumer-group [30s]
--- INFO --- [2021-11-08T12:07:11.138223Z] initializing kafka reader for partition 0 of test-topic starting at offset -2
--- INFO --- [2021-11-08T12:07:11.138251Z] started commit for group test-consumer-group

--- INFO --- [2021-11-08T12:07:11.145962Z] the kafka reader for partition 0 of test-topic is seeking to offset 0

And the logs for clientB:

Starting program [2021-11-08T12:05:47.924302Z]
----- Reading message [2021-11-08T12:05:47.924443Z]
--- INFO --- [2021-11-08T12:05:47.924452Z] entering loop for consumer group, test-consumer-group

--- INFO --- [2021-11-08T12:06:17.902866Z] joined group test-consumer-group as member clientB-eae8158a-c994-4037-8546-f8ca03b99733 in generation 2
--- INFO --- [2021-11-08T12:06:17.902893Z] selected as leader for group, test-consumer-group

--- INFO --- [2021-11-08T12:06:17.905022Z] using 'range' balancer to assign group, test-consumer-group
--- INFO --- [2021-11-08T12:06:17.905037Z] found member: clientB-eae8158a-c994-4037-8546-f8ca03b99733/[]byte(nil)
--- INFO --- [2021-11-08T12:06:17.905041Z] found topic/partition: test-topic/0
--- INFO --- [2021-11-08T12:06:17.905057Z] assigned member/topic/partitions clientB-eae8158a-c994-4037-8546-f8ca03b99733/test-topic/[0]
--- INFO --- [2021-11-08T12:06:17.905064Z] joinGroup succeeded for response, test-consumer-group.  generationID=2, memberID=clientB-eae8158a-c994-4037-8546-f8ca03b99733
--- INFO --- [2021-11-08T12:06:17.905073Z] Joined group test-consumer-group as member clientB-eae8158a-c994-4037-8546-f8ca03b99733 in generation 2
--- INFO --- [2021-11-08T12:06:17.905079Z] Syncing 1 assignments for generation 2 as member clientB-eae8158a-c994-4037-8546-f8ca03b99733
--- INFO --- [2021-11-08T12:06:17.908343Z] sync group finished for group, test-consumer-group
--- INFO --- [2021-11-08T12:06:17.910582Z] subscribed to topics and partitions: map[{topic:test-topic partition:0}:-2]
--- INFO --- [2021-11-08T12:06:17.91066Z] started commit for group test-consumer-group

--- INFO --- [2021-11-08T12:06:17.910699Z] initializing kafka reader for partition 0 of test-topic starting at offset -2
--- INFO --- [2021-11-08T12:06:17.910596Z] started heartbeat for group, test-consumer-group [30s]
--- INFO --- [2021-11-08T12:06:17.917959Z] the kafka reader for partition 0 of test-topic is seeking to offset 0
--- INFO --- [2021-11-08T12:06:47.914004Z] stopped heartbeat for group test-consumer-group

--- INFO --- [2021-11-08T12:06:47.914042Z] stopped commit for group test-consumer-group

--- INFO --- [2021-11-08T12:07:16.855419Z] no messages received from kafka within the allocated time for partition 0 of test-topic at offset 0
-- ERROR --- [2021-11-08T12:07:16.862541Z] Failed to join group test-consumer-group: [25] Unknown Member ID: the member id is not in the current generation
-- ERROR --- [2021-11-08T12:07:16.862607Z] [25] Unknown Member ID: the member id is not in the current generation

Expected behavior I expected the consumers to be able to handle this properly by just sending a JoinGroup request with an empty Member ID when the heartbeat gets an error saying that the group is rebalancing.

Additional context Please find the tcpdump so you don't need to run the program yourself: consumer_unknown_member_id.pcapng.gz

gustavooferreira avatar Nov 08 '21 12:11 gustavooferreira

Actually, if we had more consumers in the consumer group and the leader was intact I guess it would be fine to send the same memberID.

The problem seems to be that kafka doesn't include clientA in the JoinGroup response (tcpdump packet 109).

On packet 103, the heartbeat response to clientA comes back as Group rebalance in progress. But clientA is supposedly still in the consumer group.

gustavooferreira avatar Nov 08 '21 12:11 gustavooferreira

I think one of the problems here is that when our heartbeat fails, the FetchMessage calls are still hanging waiting for Kafka to return after a given WaitTime. I wonder if we should make the FetchMessage call return immediately with an error since we are no longer part of that consumer group and need to rejoin the group again.

gustavooferreira avatar Nov 09 '21 14:11 gustavooferreira

Unblocking FetchMessage when there is no point waiting seems meaningful. Would you have cycles available to contribute this change?

This also seems related to general blocking management in kafka.Reader with issues like https://github.com/segmentio/kafka-go/issues/428

achille-roussel avatar Nov 12 '21 19:11 achille-roussel