kafka-go
kafka-go copied to clipboard
received empty assignments for group
Describe the bug ERROR Message
kafka-go/consumergroup.go:954","msg":"joined group test_trans_df_consumer_online_3 as member app@kafkago-test-7678858985-rzvzm (github.com/segmentio/kafka-go)-f2954b2a-c82e-440d-b962-86cb44758bed in generation 65,leaderID rdkafka-aa22b496-eb53-412b-a8fd-65254534712e
%3|1684305504.674|PROTOUFLOW|rdkafka#consumer-2| [thrd:main]: GroupCoordinator/106: Protocol read buffer underflow for Unknown--1? v0 at 53/53 (rd_kafka_buf_read_topic_partitions:215): expected 4 bytes > 0 remaining bytes (incorrect broker.version.fallback?)
A clear and concise description of what the bug is. Start a confluent-kafka-go program, and then start a kafka-go program when the startup is complete and the data is consumed. The two programs have the same consumer_group, but different topics.
Kafka Version kafka version 2.5.1
- What version(s) of Kafka are you testing against?
- What version of kafka-go are you using? github.com/confluentinc/confluent-kafka-go version v1.9.2 github.com/segmentio/kafka-go version v0.4.40
To Reproduce
Resources to reproduce the behavior:
---
# docker-compose.yaml
#
# Adding a docker-compose file will help the maintainers setup the environment
# to reproduce the issue.
#
# If one the docker-compose files available in the repository may be used,
# mentioning it is also a useful alternative.
...
// github.com/confluentinc/confluent-kafka-go/kafka consumer code
configMap := kafka.ConfigMap{
"heartbeat.interval.ms": 3000,
"session.timeout.ms": 30000,
"bootstrap.servers": "localhost:9092",
"queued.max.messages.kbytes": 10000,
"group.id": "test_consumer_1",
"auto.offset.reset": "earliest",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "100000",
}
consumer, err := kafka.NewConsumer(&configMap)
if nil != err {
panic(err)
}
err = consumer.SubscribeTopics([]string{"test_topic_1"}, nil)
if nil != err {
panic(err)
}
for i := 0; i < 10000; i++ {
event := kafkaConsumer.Poll(300)
time.Sleep(500 * time.Millisecond)
}
// github.com/segmentio/kafka-go consumer code
consumerGroup, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{
ID: "test_consumer_1",
Brokers: []string{"localhost:9092"},
Topics: []string{"test_topic_2"},
WatchPartitionChanges: true,
Logger: log.StandardLogger(),
})
if err != nil {
return nil, fmt.Errorf("assign offsets new consumer,%w", err)
}
gen, err := consumerGroup.Next(context.Background())
if err!=nil {
return nil,err
}
time.Sleep(10 * time.Second)
consumerGroup.Close()
Expected Behavior
A clear and concise description of what you expected to happen.
Observed Behavior
A clear and concise description of the behavior you observed.
Often times, pasting the logging output from a kafka.Reader or kafka.Writer will
provide useful details to help maintainers investigate the issue and provide a
fix. If possible, providing stack traces or CPU/memory profiles may also contain
valuable information to understand the conditions that triggered the issue.
Additional Context
Add any other context about the problem here.
//consumergroup.go //joinGroupRequestV1->Version: 1,
func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { request := joinGroupRequestV1{ Version: 1, GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), RebalanceTimeout: int32(cg.config.RebalanceTimeout / time.Millisecond), ProtocolType: defaultProtocolType, }
for _, balancer := range cg.config.GroupBalancers {
userData, err := balancer.UserData()
if err != nil {
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
}
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
ProtocolName: balancer.ProtocolName(),
ProtocolMetadata: groupMetadata{
//Version: 1,
Topics: cg.config.Topics,
UserData: userData,
}.bytes(),
})
}
return request, nil
}
//syncGroupRequestV0->Version: 3, func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 { request := syncGroupRequestV0{ Version: 3, GroupID: cg.config.ID, GenerationID: generationID, MemberID: memberID, }
if memberAssignments != nil {
request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)
for memberID, topics := range memberAssignments {
topics32 := make(map[string][]int32)
for topic, partitions := range topics {
partitions32 := make([]int32, len(partitions))
for i := range partitions {
partitions32[i] = int32(partitions[i])
}
topics32[topic] = partitions32
}
request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
MemberID: memberID,
MemberAssignments: groupAssignment{
//Version: 3,
Topics: topics32,
}.bytes(),
})
}
cg.withLogger(func(logger Logger) {
logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID)
})
}
return request
}
my problem was solved when I put the version location in the request。
If start the kafka-go program first and then start the confluent-kafka-go program, the error message is as follows:
consumergroup.go:801","msg":"Failed to join group test_consumer_1: unable to read metadata for member, rdkafka-c28116d3-685a-45a3-bc14-c22827e53245: %!w(
@wangrenyi should we mark this as resolved then? The fix was to add the version to the request?
@petedannemann adding version raises additional issues, and no other solution has been found to use confluent-kafka-go and kafka-go together in the same consumer group.I just figured it had something to do with the Group Coordinator. Please let me know if there is any other solution, thank you very much.
Ok so it sounds like you are using a different API version for kafka-go and confluent-kafka-go and as they are using the same consumer group this is creating issues. I don't think this is necessarily a kafka-go bug, it just seems like expected Kafka behavior as the consumers in a consumer group should be using the same API version. You should either use a different consumer group for each consumer since they are consuming from different topics or find a way to use the same API version for each consumer by specifying the version manually.