confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

Inconsistent logs between the logs channel and logging without the logs channel

Open shubhang93 opened this issue 2 years ago • 2 comments

When I consume log events using the "go.logs.channel.enable": true. There are logs which are of certain tags which do not show up in my logs. Example

Client version: v1.8.2

func New(bootstrapServer string, groupID string) *Proxy {
    logChan := make(chan kafka.LogEvent, 100000)
    config := kafka.ConfigMap{
        "bootstrap.servers":      bootstrapServer,
        "debug":                  "cgrp,consumer",
        "group.id":               groupID,
        "enable.auto.commit":     false,
        "go.logs.channel.enable": true,
        "go.logs.channel":        logChan,
        "max.poll.interval.ms":   6000,
        "session.timeout.ms":     6000,
    }
    c, err := kafka.NewConsumer(&config)
    if err != nil {
        panic(err)
    }
    return &Proxy{
        c: c,
    }
}

func (p *Proxy) Consume(ctx context.Context, topics []string, messages chan []*kafka.Message) error {
    if err := p.c.SubscribeTopics(topics, func(consumer *kafka.Consumer, event kafka.Event) error {
        log.Println(p.c.String(), "[thrd:proxcon]:", "REBALANCING")
        return nil
    }); err != nil {
        panic(err)
    }

    done := ctx.Done()

    go func() {
        for le := range p.c.Logs() {
            log.Printf("%s\n", le)
        }
    }()

    run := true
    for run {
        select {
        case <-done:
            run = false
        default:
            log.Println(p.c.String(), "[thrd:proxcon]:", "Invoking poll")
            e := p.c.Poll(10)
            log.Println(p.c.String(), "[thrd:proxcon]:", "Finished poll")
            switch event := e.(type) {
            case kafka.Error:
                log.Printf("CONSUMER POLL ERROR:%v\n", event)
                log.Printf("CONSUMER POLL ERROR CODE:%d\n", event.Code())
                log.Printf("CONSUMER POLL ERROR STRING:%v\n", event.String())
            case *kafka.Message:
                log.Println(p.c.String(), "[thrd:proxcon]:", "Processing")
                for {
                    <-done
                    run = false
                    break
                }
            }
        }
    }

    return ctx.Err()
}
%7|1651832758.168|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 1
%7|1651832761.182|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 1
%4|1651832762.683|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (6000ms) exceeded by 79ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%7|1651832762.683|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": updating member id "rdkafka-8ea24d83-73ca-4956-8671-8ba4f1dbe4f8" -> ""
%7|1651832762.683|LEAVE|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": leave (in state up)
%7|1651832762.683|LEAVE|rdkafka#consumer-1| [thrd:main]: 10.120.9.201:6668/9: Leaving group
%7|1651832762.683|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004" initiating rebalance (EAGER) in state up (join-state steady) with 50 assigned partition(s) (lost): max.poll.interval.ms exceeded
%7|1651832762.683|LOST|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": current assignment of 50 partition(s) lost: max.poll.interval.ms exceeded: revoking assignment and rejoining
%7|1651832762.683|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004" changed join state steady -> wait-unassign-call (state up)
%7|1651832762.683|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": delegating revoke of 50 partition(s) to application on queue rd_kafka_cgrp_new: max.poll.interval.ms exceeded
%7|1651832762.683|PAUSE|rdkafka#consumer-1| [thrd:main]: Pausing fetchers for 50 assigned partition(s): rebalance
%7|1651832762.683|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "test_proxcon_0004": clearing group assignment
%7|1651832762.774|LEAVEGROUP|rdkafka#consumer-1| [thrd:main]: LeaveGroup response received in state up
^C2022/05/06 15:56:16 error consuming:  context canceled

The above logs are missed when I use the logs channel. I do not get logs regarding member updates and maxpoll. Can we make logs of all Tag type to be avaialable on the logs channel so that the caller can filter out which tags he wants to use and which he wants to omit

These are the logs when using the logs channel

2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Invoking poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Finished poll
2022/05/06 16:15:30 rdkafka#consumer-1 [thrd:proxcon]: Processing
2022/05/06 16:15:32 [2022-05-06T16:15:32+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
2022/05/06 16:15:35 [2022-05-06T16:15:35+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
2022/05/06 16:15:38 [2022-05-06T16:15:38+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
2022/05/06 16:15:41 [2022-05-06T16:15:41+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5
2022/05/06 16:15:44 [2022-05-06T16:15:44+05:30][rdkafka#consumer-1][HEARTBEAT][7][thrd:main]: GroupCoordinator/9: Heartbeat for group "test_proxcon_0004" generation id 5

As you can see I missed out on the logs where the consumer leaves the group and also the warning about max poll interval. This also keeps printing the heartbeat logs even after the consumer leaves the group

shubhang93 avatar May 05 '22 10:05 shubhang93

Thanks for reporting this, we are looking into it.

jliunyu avatar May 09 '22 22:05 jliunyu

Any updates on this ?

shubhang93 avatar Jul 21 '22 04:07 shubhang93

Will be fixed along with #934, since the commit is made, and will be there in the next release. Thanks for reporting this!

milindl avatar Mar 13 '23 14:03 milindl