confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

[Question] Reconnect to kafka broker.

Open ashishbhatt-pepperstone opened this issue 1 year ago • 0 comments

Description

I have created a setup using https://developer.confluent.io/get-started/go. Created broker in local docker container.

docker compose for kafka broker: version: '3' services: broker: image: confluentinc/cp-kafka:7.5.0 container_name: broker ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_NODE_ID: 1 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LOG_DIRS: /tmp/kraft-combined-logs CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

In consumer.go I am using c.Events() run := true for run { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false case ev := <-c.Events(): switch e := ev.(type) { case *kafka.Message: fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n", *e.TopicPartition.Topic, string(e.Key), string(e.Value)) case kafka.Error: fmt.Printf("Error connecting to kafka : %s", e.Error()) default: fmt.Printf("Unattended Message %T", e) } } }

I want to implement the retry mechanism via exponential backoff if there is an error connecting to broker. But I found in this issue, confluentic-kafka-go internally try to reconnect to kafka broker.

I want to know how many time it will try to reconnect ? Is there any limit for time or number of times ? Is there any configuration which we can set to limit the time or number of times to reconnect ?

Checklist

Please provide the following information:

  • [] confluent-kafka-go and librdkafka version (LibraryVersion()): v1.9.2
  • [] Apache Kafka broker version: confluentinc/cp-kafka:7.5.0
  • [ ] Client configuration: ConfigMap{...} conf["group.id"] = "kafka-go-getting-started" conf["auto.offset.reset"] = "earliest" conf["enable.auto.commit"] = true conf["go.events.channel.enable"] = true
  • [ ] Operating system: ubuntu
  • [ ] Provide client logs (with "debug": ".." as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

ashishbhatt-pepperstone avatar Jan 10 '24 08:01 ashishbhatt-pepperstone