kafka-go
kafka-go copied to clipboard
Unknown Member ID: the member id is not in the current generation
Describe the bug We've hit an issue in production where we get a lot of unknown member ID errors.
Kafka Version I'm using confluent's docker image: confluentinc/cp-kafka:6.2.1
To Reproduce I've managed to create a simplified version that shows this error.
The setup is, one topic called test-topic with just one partition, and using the following golang code:
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
fmt.Printf("Starting program [%s]\n", time.Now().Format(time.RFC3339Nano))
clientID := os.Args[1]
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
ClientID: clientID,
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
GroupID: "test-consumer-group",
Dialer: dialer,
MinBytes: 1, // 1B
MaxBytes: 10e6, // 10MB
MaxWait: 1 * time.Minute,
StartOffset: kafka.FirstOffset,
Logger: Logger{"--- INFO ---"},
ErrorLogger: Logger{"-- ERROR ---"},
HeartbeatInterval: 30 * time.Second,
SessionTimeout: 95 * time.Second,
})
for {
fmt.Printf("----- Reading message [%s]\n", time.Now().Format(time.RFC3339Nano))
m, err := r.FetchMessage(context.Background())
fmt.Printf("----- Message read [%s]\n", time.Now().Format(time.RFC3339Nano))
if err != nil {
fmt.Printf("error: %s\n", err)
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
fmt.Printf("----- simulate task [%s]\n", time.Now().Format(time.RFC3339Nano))
time.Sleep(10 * time.Second)
fmt.Printf("----- committing offset [%s]\n", time.Now().Format(time.RFC3339Nano))
err = r.CommitMessages(context.Background(), m)
fmt.Printf("----- commit offset sent [%s]\n", time.Now().Format(time.RFC3339Nano))
if err != nil {
fmt.Printf("error: %s\n", err)
break
}
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
type Logger struct {
Prefix string
}
func (l Logger) Printf(msg string, args ...interface{}) {
msg = fmt.Sprintf("%s [%s] %s\n", l.Prefix, time.Now().Format(time.RFC3339Nano), msg)
fmt.Printf(msg, args...)
}
Then, in one shell, run:
go run main.go clientA
Wait a few seconds and run on a different shell:
go run main.go clientB
You will notice that they will both get errors in perpetuity complaining about member ID unknown.
The logs for clientA:
Starting program [2021-11-08T12:05:37.177572Z]
----- Reading message [2021-11-08T12:05:37.177782Z]
--- INFO --- [2021-11-08T12:05:37.177792Z] entering loop for consumer group, test-consumer-group
--- INFO --- [2021-11-08T12:05:37.19175Z] joined group test-consumer-group as member clientA-3aac949e-836d-44fe-8275-7c9d429af88e in generation 1
--- INFO --- [2021-11-08T12:05:37.191766Z] selected as leader for group, test-consumer-group
--- INFO --- [2021-11-08T12:05:37.193769Z] using 'range' balancer to assign group, test-consumer-group
--- INFO --- [2021-11-08T12:05:37.193782Z] found member: clientA-3aac949e-836d-44fe-8275-7c9d429af88e/[]byte(nil)
--- INFO --- [2021-11-08T12:05:37.193786Z] found topic/partition: test-topic/0
--- INFO --- [2021-11-08T12:05:37.193805Z] assigned member/topic/partitions clientA-3aac949e-836d-44fe-8275-7c9d429af88e/test-topic/[0]
--- INFO --- [2021-11-08T12:05:37.193812Z] joinGroup succeeded for response, test-consumer-group. generationID=1, memberID=clientA-3aac949e-836d-44fe-8275-7c9d429af88e
--- INFO --- [2021-11-08T12:05:37.193815Z] Joined group test-consumer-group as member clientA-3aac949e-836d-44fe-8275-7c9d429af88e in generation 1
--- INFO --- [2021-11-08T12:05:37.193821Z] Syncing 1 assignments for generation 1 as member clientA-3aac949e-836d-44fe-8275-7c9d429af88e
--- INFO --- [2021-11-08T12:05:37.196984Z] sync group finished for group, test-consumer-group
--- INFO --- [2021-11-08T12:05:37.198517Z] subscribed to topics and partitions: map[{topic:test-topic partition:0}:-2]
--- INFO --- [2021-11-08T12:05:37.198591Z] started commit for group test-consumer-group
--- INFO --- [2021-11-08T12:05:37.1986Z] initializing kafka reader for partition 0 of test-topic starting at offset -2
--- INFO --- [2021-11-08T12:05:37.198527Z] started heartbeat for group, test-consumer-group [30s]
--- INFO --- [2021-11-08T12:05:37.209351Z] the kafka reader for partition 0 of test-topic is seeking to offset 0
--- INFO --- [2021-11-08T12:06:07.201882Z] stopped heartbeat for group test-consumer-group
--- INFO --- [2021-11-08T12:06:07.201935Z] stopped commit for group test-consumer-group
--- INFO --- [2021-11-08T12:06:36.148555Z] no messages received from kafka within the allocated time for partition 0 of test-topic at offset 0
-- ERROR --- [2021-11-08T12:06:36.155666Z] Failed to join group test-consumer-group: [25] Unknown Member ID: the member id is not in the current generation
-- ERROR --- [2021-11-08T12:06:36.155738Z] [25] Unknown Member ID: the member id is not in the current generation
--- INFO --- [2021-11-08T12:07:11.132153Z] joined group test-consumer-group as member clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7 in generation 3
--- INFO --- [2021-11-08T12:07:11.132178Z] selected as leader for group, test-consumer-group
--- INFO --- [2021-11-08T12:07:11.133948Z] using 'range' balancer to assign group, test-consumer-group
--- INFO --- [2021-11-08T12:07:11.13396Z] found member: clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7/[]byte(nil)
--- INFO --- [2021-11-08T12:07:11.133964Z] found topic/partition: test-topic/0
--- INFO --- [2021-11-08T12:07:11.133982Z] assigned member/topic/partitions clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7/test-topic/[0]
--- INFO --- [2021-11-08T12:07:11.133987Z] joinGroup succeeded for response, test-consumer-group. generationID=3, memberID=clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7
--- INFO --- [2021-11-08T12:07:11.133995Z] Joined group test-consumer-group as member clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7 in generation 3
--- INFO --- [2021-11-08T12:07:11.134Z] Syncing 1 assignments for generation 3 as member clientA-c4dc3973-b0b6-41c6-b466-f174d7aad1b7
--- INFO --- [2021-11-08T12:07:11.136683Z] sync group finished for group, test-consumer-group
--- INFO --- [2021-11-08T12:07:11.1382Z] subscribed to topics and partitions: map[{topic:test-topic partition:0}:-2]
--- INFO --- [2021-11-08T12:07:11.138214Z] started heartbeat for group, test-consumer-group [30s]
--- INFO --- [2021-11-08T12:07:11.138223Z] initializing kafka reader for partition 0 of test-topic starting at offset -2
--- INFO --- [2021-11-08T12:07:11.138251Z] started commit for group test-consumer-group
--- INFO --- [2021-11-08T12:07:11.145962Z] the kafka reader for partition 0 of test-topic is seeking to offset 0
And the logs for clientB:
Starting program [2021-11-08T12:05:47.924302Z]
----- Reading message [2021-11-08T12:05:47.924443Z]
--- INFO --- [2021-11-08T12:05:47.924452Z] entering loop for consumer group, test-consumer-group
--- INFO --- [2021-11-08T12:06:17.902866Z] joined group test-consumer-group as member clientB-eae8158a-c994-4037-8546-f8ca03b99733 in generation 2
--- INFO --- [2021-11-08T12:06:17.902893Z] selected as leader for group, test-consumer-group
--- INFO --- [2021-11-08T12:06:17.905022Z] using 'range' balancer to assign group, test-consumer-group
--- INFO --- [2021-11-08T12:06:17.905037Z] found member: clientB-eae8158a-c994-4037-8546-f8ca03b99733/[]byte(nil)
--- INFO --- [2021-11-08T12:06:17.905041Z] found topic/partition: test-topic/0
--- INFO --- [2021-11-08T12:06:17.905057Z] assigned member/topic/partitions clientB-eae8158a-c994-4037-8546-f8ca03b99733/test-topic/[0]
--- INFO --- [2021-11-08T12:06:17.905064Z] joinGroup succeeded for response, test-consumer-group. generationID=2, memberID=clientB-eae8158a-c994-4037-8546-f8ca03b99733
--- INFO --- [2021-11-08T12:06:17.905073Z] Joined group test-consumer-group as member clientB-eae8158a-c994-4037-8546-f8ca03b99733 in generation 2
--- INFO --- [2021-11-08T12:06:17.905079Z] Syncing 1 assignments for generation 2 as member clientB-eae8158a-c994-4037-8546-f8ca03b99733
--- INFO --- [2021-11-08T12:06:17.908343Z] sync group finished for group, test-consumer-group
--- INFO --- [2021-11-08T12:06:17.910582Z] subscribed to topics and partitions: map[{topic:test-topic partition:0}:-2]
--- INFO --- [2021-11-08T12:06:17.91066Z] started commit for group test-consumer-group
--- INFO --- [2021-11-08T12:06:17.910699Z] initializing kafka reader for partition 0 of test-topic starting at offset -2
--- INFO --- [2021-11-08T12:06:17.910596Z] started heartbeat for group, test-consumer-group [30s]
--- INFO --- [2021-11-08T12:06:17.917959Z] the kafka reader for partition 0 of test-topic is seeking to offset 0
--- INFO --- [2021-11-08T12:06:47.914004Z] stopped heartbeat for group test-consumer-group
--- INFO --- [2021-11-08T12:06:47.914042Z] stopped commit for group test-consumer-group
--- INFO --- [2021-11-08T12:07:16.855419Z] no messages received from kafka within the allocated time for partition 0 of test-topic at offset 0
-- ERROR --- [2021-11-08T12:07:16.862541Z] Failed to join group test-consumer-group: [25] Unknown Member ID: the member id is not in the current generation
-- ERROR --- [2021-11-08T12:07:16.862607Z] [25] Unknown Member ID: the member id is not in the current generation
Expected behavior I expected the consumers to be able to handle this properly by just sending a JoinGroup request with an empty Member ID when the heartbeat gets an error saying that the group is rebalancing.
Additional context Please find the tcpdump so you don't need to run the program yourself: consumer_unknown_member_id.pcapng.gz
Actually, if we had more consumers in the consumer group and the leader was intact I guess it would be fine to send the same memberID.
The problem seems to be that kafka doesn't include clientA in the JoinGroup response (tcpdump packet 109).
On packet 103, the heartbeat response to clientA comes back as Group rebalance in progress. But clientA is supposedly still in the consumer group.
I think one of the problems here is that when our heartbeat fails, the FetchMessage calls are still hanging waiting for Kafka to return after a given WaitTime. I wonder if we should make the FetchMessage call return immediately with an error since we are no longer part of that consumer group and need to rejoin the group again.
Unblocking FetchMessage when there is no point waiting seems meaningful. Would you have cycles available to contribute this change?
This also seems related to general blocking management in kafka.Reader with issues like https://github.com/segmentio/kafka-go/issues/428