confluent-kafka-go
confluent-kafka-go copied to clipboard
[Question] Reconnect to kafka broker.
Description
I have created a setup using https://developer.confluent.io/get-started/go. Created broker in local docker container.
docker compose for kafka broker:
version: '3' services: broker: image: confluentinc/cp-kafka:7.5.0 container_name: broker ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_NODE_ID: 1 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LOG_DIRS: /tmp/kraft-combined-logs CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
In consumer.go I am using c.Events()
run := true for run { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false case ev := <-c.Events(): switch e := ev.(type) { case *kafka.Message: fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n", *e.TopicPartition.Topic, string(e.Key), string(e.Value)) case kafka.Error: fmt.Printf("Error connecting to kafka : %s", e.Error()) default: fmt.Printf("Unattended Message %T", e) } } }
I want to implement the retry mechanism via exponential backoff if there is an error connecting to broker. But I found in this issue, confluentic-kafka-go internally try to reconnect to kafka broker.
I want to know how many time it will try to reconnect ? Is there any limit for time or number of times ? Is there any configuration which we can set to limit the time or number of times to reconnect ?
Checklist
Please provide the following information:
- [] confluent-kafka-go and librdkafka version (
LibraryVersion()
): v1.9.2 - [] Apache Kafka broker version: confluentinc/cp-kafka:7.5.0
- [ ] Client configuration:
ConfigMap{...}
conf["group.id"] = "kafka-go-getting-started" conf["auto.offset.reset"] = "earliest" conf["enable.auto.commit"] = true conf["go.events.channel.enable"] = true - [ ] Operating system: ubuntu
- [ ] Provide client logs (with
"debug": ".."
as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue