confluent-kafka-go
confluent-kafka-go copied to clipboard
Client connection hang when K8s pod of broker killed
Description
The broker pod is killed when producing data, I find the client will hand when the broker's pod is the leader of accessing partition and some error log output:
%6|1652276997.073|FAIL|rdkafka#producer-1| [thrd:my-release-kafka-0.my-release-kafka-headless.default.svc.cluste]: my-release-kafka-0.my-release-kafka-headless.default.svc.cluster.local:9092/0: Disconnected (after 14008ms in state UP)
How to reproduce
- Use helm chart to deploy a 3 replica Kafka cluster:
kubectl get pod
NAME READY STATUS RESTARTS AGE
my-release-kafka-0 1/1 Running 0 30s
my-release-kafka-1 1/1 Running 0 17h
my-release-kafka-2 1/1 Running 0 17h
my-release-kafka-client 1/1 Running 0 25h
my-release-zookeeper-0 1/1 Running 0 18h
- Run the following code:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
topic := "t"
config := kafka.ConfigMap{
"bootstrap.servers": "my-release-kafka-0.my-release-kafka-headless.default.svc.cluster.local:9092,my-release-kafka-1.my-release-kafka-headless.default.svc.cluster.local:9092,my-release-kafka-2.my-release-kafka-headlessfault.svc.cluster.local:9092",
"socket.timeout.ms": 300000,
"socket.max.fails": 3,
"message.max.bytes": 10485760,
"go.events.channel.size": 0,
"go.produce.channel.size": 0,
"linger.ms": 20,
"api.version.request": true,
//"debug": "all",
}
p, err := kafka.NewProducer(&config)
defer p.Close()
if err != nil {
panic(err)
}
deliveryChan := make(chan kafka.Event, 1)
i := 0
for {
word := []byte(fmt.Sprintf("test-%d", i))
err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0},
Value: word,
}, deliveryChan)
if err != nil {
panic(err)
}
e := <- deliveryChan
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
time.Sleep(time.Second * 3)
}
}
- Find the broker of the partition leader and delete the pod, kafka-1 will be deleted in my case
kubectl delete pod my-release-kafka-1
Checklist
Please provide the following information:
- [ ] confluent-kafka-go and librdkafka version (
LibraryVersion(v1.8.2)
): - [ ] Apache Kafka broker version3.1:
- [ ] Client configuration:
ConfigMap{...}
- [ ] Operating system ubunut:
- [ ] Provide client logs (with
"debug": ".."
as necessary) - [ ] Provide broker log excerpts
- [x] Critical issue
log.txt there is debug log file
From librdkafka's perspective in the logs the messages are delivered just fine and there does not seem to be any outstanding messages. Could you try simplifying your code by reducing the config to just what is needed:
"bootstrap.servers": "my-release-kafka-0.my-release-kafka-headless.default.svc.cluster.local:9092,my-release-kafka-1.my-release-kafka-headless.default.svc.cluster.local:9092,my-release-kafka-2.my-release-kafka-headlessfault.svc.cluster.local:9092",
"message.max.bytes": 10485760,
"linger.ms": 20,
//"debug": "all",
This will help minimize the problem space.
Side note: you've implemented a synchronous producer (produce a single message, wait for delivery) - this is very inefficient and we highly recommend an asynchronous approach, which is usually done by reading the delivery channel (either a custom or p.Events()) through a separate Go routine.
Side note2: You should also read the p.Events() channel for client-level error events. Message-level events are still passed on your custom channel, or the Events() channel if you don't specify a custom channel to Produce().
Hi, @edenhill, it works after using this config, and I also try different configs multiple times, the root cause is the following configs:
"go.events.channel.size": 0,
"go.produce.channel.size": 0,
Using a sync producer API, the reason is that want to get a message offset after producing as a checkpoint.