confluent-kafka-go
confluent-kafka-go copied to clipboard
Inconsistent logs between the logs channel and logging without the logs channel
When I consume log events using the "go.logs.channel.enable": true
. There are logs which are of certain tags which do not show up in my logs.
Example
Client version: v1.8.2
func New(bootstrapServer string, groupID string) *Proxy {
logChan := make(chan kafka.LogEvent, 100000)
config := kafka.ConfigMap{
"bootstrap.servers": bootstrapServer,
"debug": "cgrp,consumer",
"group.id": groupID,
"enable.auto.commit": false,
"go.logs.channel.enable": true,
"go.logs.channel": logChan,
"max.poll.interval.ms": 6000,
"session.timeout.ms": 6000,
}
c, err := kafka.NewConsumer(&config)
if err != nil {
panic(err)
}
return &Proxy{
c: c,
}
}
func (p *Proxy) Consume(ctx context.Context, topics []string, messages chan []*kafka.Message) error {
if err := p.c.SubscribeTopics(topics, func(consumer *kafka.Consumer, event kafka.Event) error {
log.Println(p.c.String(), "[thrd:proxcon]:", "REBALANCING")
return nil
}); err != nil {
panic(err)
}
done := ctx.Done()
go func() {
for le := range p.c.Logs() {
log.Printf("%s\n", le)
}
}()
run := true
for run {
select {
case <-done:
run = false
default:
log.Println(p.c.String(), "[thrd:proxcon]:", "Invoking poll")
e := p.c.Poll(10)
log.Println(p.c.String(), "[thrd:proxcon]:", "Finished poll")
switch event := e.(type) {
case kafka.Error:
log.Printf("CONSUMER POLL ERROR:%v\n", event)
log.Printf("CONSUMER POLL ERROR CODE:%d\n", event.Code())
log.Printf("CONSUMER POLL ERROR STRING:%v\n", event.String())
case *kafka.Message:
log.Println(p.c.String(), "[thrd:proxcon]:", "Processing")
for {
<-done
run = false
break
}
}
}
}
return ctx.Err()
}
%7|1651832758.168|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 1
%7|1651832761.182|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 1
%4|1651832762.683|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (6000ms) exceeded by 79ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%7|1651832762.683|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": updating member id "rdkafka-8ea24d83-73ca-4956-8671-8ba4f1dbe4f8" -> ""
%7|1651832762.683|LEAVE|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": leave (in state up)
%7|1651832762.683|LEAVE|rdkafka#consumer-1| [thrd:main]: 10.120.9.201:6668/9: Leaving group
%7|1651832762.683|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004" initiating rebalance (EAGER) in state up (join-state steady) with 50 assigned partition(s) (lost): max.poll.interval.ms exceeded
%7|1651832762.683|LOST|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": current assignment of 50 partition(s) lost: max.poll.interval.ms exceeded: revoking assignment and rejoining
%7|1651832762.683|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004" changed join state steady -> wait-unassign-call (state up)
%7|1651832762.683|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": delegating revoke of 50 partition(s) to application on queue rd_kafka_cgrp_new: max.poll.interval.ms exceeded
%7|1651832762.683|PAUSE|rdkafka#consumer-1| [thrd:main]: Pausing fetchers for 50 assigned partition(s): rebalance
%7|1651832762.683|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": clearing group assignment
%7|1651832762.774|LEAVEGROUP|rdkafka#consumer-1| [thrd:main]: LeaveGroup response received in state up
^C2022/05/06 15:56:16 error consuming: context canceled
The above logs are missed when I use the logs channel. I do not get logs regarding member updates and maxpoll. Can we make logs of all Tag
type to be avaialable on the logs channel so that the caller can filter out which tags he wants to use and which he wants to omit
These are the logs when using the logs channel
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Processing
2022/05/06 16:15:32 [2022-05-06T16:15:32+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
2022/05/06 16:15:35 [2022-05-06T16:15:35+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
2022/05/06 16:15:38 [2022-05-06T16:15:38+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
2022/05/06 16:15:41 [2022-05-06T16:15:41+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
2022/05/06 16:15:44 [2022-05-06T16:15:44+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
As you can see I missed out on the logs where the consumer leaves the group and also the warning about max poll interval. This also keeps printing the heartbeat logs even after the consumer leaves the group
Thanks for reporting this, we are looking into it.
Any updates on this ?
Will be fixed along with #934, since the commit is made, and will be there in the next release. Thanks for reporting this!