confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

Client connection hang when K8s pod of broker killed

Open jaime0815 opened this issue 2 years ago • 3 comments

Description

The broker pod is killed when producing data, I find the client will hand when the broker's pod is the leader of accessing partition and some error log output:

%6|1652276997.073|FAIL|rdkafka#producer-1| [thrd:my-release-kafka-0.my-release-kafka-headless.default.svc.cluste]: my-release-kafka-0.my-release-kafka-headless.default.svc.cluster.local:9092/0: Disconnected (after 14008ms in state UP)

How to reproduce

  1. Use helm chart to deploy a 3 replica Kafka cluster:
kubectl get pod
NAME                        READY   STATUS    RESTARTS   AGE
my-release-kafka-0          1/1     Running   0          30s
my-release-kafka-1          1/1     Running   0          17h
my-release-kafka-2          1/1     Running   0          17h
my-release-kafka-client     1/1     Running   0          25h
my-release-zookeeper-0      1/1     Running   0          18h
  1. Run the following code:
package main

import (
   "fmt"
   "github.com/confluentinc/confluent-kafka-go/kafka"
   "time"
)

func main() {
   rand.Seed(time.Now().UnixNano())
   topic := "t"

   config := kafka.ConfigMap{
      "bootstrap.servers":       "my-release-kafka-0.my-release-kafka-headless.default.svc.cluster.local:9092,my-release-kafka-1.my-release-kafka-headless.default.svc.cluster.local:9092,my-release-kafka-2.my-release-kafka-headlessfault.svc.cluster.local:9092",
      "socket.timeout.ms": 300000,
      "socket.max.fails":  3,
      "message.max.bytes": 10485760,
      "go.events.channel.size": 0,
      "go.produce.channel.size": 0,
      "linger.ms": 20,
      "api.version.request": true,
      //"debug":                "all",
   }


   p, err := kafka.NewProducer(&config)
   defer p.Close()
   if err != nil {
      panic(err)
   }

   deliveryChan := make(chan kafka.Event, 1)
   i := 0
   for  {
      word := []byte(fmt.Sprintf("test-%d",  i))
      err := p.Produce(&kafka.Message{
         TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0},
         Value:          word,
      }, deliveryChan)

      if err != nil {
         panic(err)
      }

      e := <- deliveryChan
      switch ev := e.(type) {
      case *kafka.Message:
         if ev.TopicPartition.Error != nil {
            fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
         } else {
            fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
         }
      }

      time.Sleep(time.Second * 3)
   }

}
  1. Find the broker of the partition leader and delete the pod, kafka-1 will be deleted in my case
kubectl delete pod  my-release-kafka-1

Checklist

Please provide the following information:

  • [ ] confluent-kafka-go and librdkafka version (LibraryVersion(v1.8.2)):
  • [ ] Apache Kafka broker version3.1:
  • [ ] Client configuration: ConfigMap{...}
  • [ ] Operating system ubunut:
  • [ ] Provide client logs (with "debug": ".." as necessary)
  • [ ] Provide broker log excerpts
  • [x] Critical issue

jaime0815 avatar May 12 '22 08:05 jaime0815

log.txt there is debug log file

jaime0815 avatar May 12 '22 08:05 jaime0815

From librdkafka's perspective in the logs the messages are delivered just fine and there does not seem to be any outstanding messages. Could you try simplifying your code by reducing the config to just what is needed:

     "bootstrap.servers":       "my-release-kafka-0.my-release-kafka-headless.default.svc.cluster.local:9092,my-release-kafka-1.my-release-kafka-headless.default.svc.cluster.local:9092,my-release-kafka-2.my-release-kafka-headlessfault.svc.cluster.local:9092",
      "message.max.bytes": 10485760,
      "linger.ms": 20,
      //"debug":                "all",

This will help minimize the problem space.

Side note: you've implemented a synchronous producer (produce a single message, wait for delivery) - this is very inefficient and we highly recommend an asynchronous approach, which is usually done by reading the delivery channel (either a custom or p.Events()) through a separate Go routine.

Side note2: You should also read the p.Events() channel for client-level error events. Message-level events are still passed on your custom channel, or the Events() channel if you don't specify a custom channel to Produce().

edenhill avatar May 13 '22 07:05 edenhill

Hi, @edenhill, it works after using this config, and I also try different configs multiple times, the root cause is the following configs:

		"go.events.channel.size": 0,
		"go.produce.channel.size": 0,

Using a sync producer API, the reason is that want to get a message offset after producing as a checkpoint.

jaime0815 avatar May 13 '22 08:05 jaime0815