franz-go
franz-go copied to clipboard
Producer can not write to broker nr. 2
We are running redpanda self-hosted with three brokers.
Our app consume from 1 topic, process messages, and produce to 3 topics. Consuming and processing takes no times, but producing back to topics works OK for a few messages, but then fails. It's possible I've gotten a bit confused here, but to me it looks like this specifically happens when writing to broker nr. 2.
We have a few other apps connected to the cluster, they all run fine. Some of them in go.
I am not sure what happens here, just reaching out to see if anyone got suggestions.
I tried to fiddle around with produce config, buffer sizes, etc. but no luck. Is my config wrong? Any ideas?
- Go. 1.19
- franz-go: 1.16.1
Snippets
...
configConsumer := []kgo.Opt{
kgo.SeedBrokers(config.Redpanda.BrokerUrl),
kgo.ConsumeTopics(config.Redpanda.ConsumerTopic),
kgo.ConsumerGroup(config.Redpanda.GroupId),
kgo.ClientID("lte-m-csv-parser-consumer"),
kgo.Balancers(kgo.RoundRobinBalancer()),
kgo.WithLogger(loggerConsumer),
}
consumerClient, err := kgo.NewClient(configConsumer...)
if err != nil {
log.Fatalf("Failed to connect to Redpanda: %v", err)
}
defer consumerClient.Close()
configProducer := []kgo.Opt{
kgo.SeedBrokers(config.Redpanda.BrokerUrl),
kgo.MaxBufferedBytes(10 * 1024), // 10 KB maximum buffer size
kgo.ProducerBatchCompression(kgo.NoCompression()),
kgo.WithLogger(loggerProducer),
kgo.ClientID("lte-m-csv-parser-producer"),
}
producerClient, err := kgo.NewClient(configProducer...)
if err != nil {
log.Fatalf("Failed to connect to Redpanda: %v", err)
}
defer producerClient.Close()
...
...
// both of the below snippets are started in separate goroutines in main.
for {
fetches := service.consumerClient.PollRecords(service.ctx, 100)
fetches.EachError(func(topic string, partition int32, err error) {
log.Printf("Error fetching from topic %s partition %d: %v", topic, partition, err)
})
fetches.EachRecord(func(record *kgo.Record) {
// Process record
messages := service.parseMessage(record.Value)
for _, message := range messages {
// send message to channel
service.channel <- message
messageCount++
}
})
}
}
...
// read from channel and produce to redpanda
for message := range service.channel {
service.producerClient.Produce(service.ctx, &message, callbackProducer)
}
Logs
INFO 2024-05-02T14:31:13.563229821Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] wrote Produce v7; broker: 2, bytes_written: 1039, write_wait: 97.652µs, time_to_write: 79.068µs, err: <nil>
INFO 2024-05-02T14:31:13.566255149Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] read Produce v7; broker: 2, bytes_read: 0, read_wait: 88.88µs, time_to_read: 1.92081ms, err: EOF
INFO 2024-05-02T14:31:13.566264884Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] read from broker errored, killing connection; addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2, successful_reads: 1, err: EOF
INFO 2024-05-02T14:31:13.566271592Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] retry batches processed; wanted_metadata_update: false, triggering_metadata_update: false, should_backoff: false
INFO 2024-05-02T14:31:13.820934368Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] opening connection to broker; addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2
INFO 2024-05-02T14:31:13.825469439Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] connection opened to broker; addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2
INFO 2024-05-02T14:31:13.827150889Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] connection initialized successfully; addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2
INFO 2024-05-02T14:31:13.827367931Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] wrote Produce v7; broker: 2, bytes_written: 1039, write_wait: 5.907426ms, time_to_write: 42.369µs, err: <nil>
INFO 2024-05-02T14:31:13.827706811Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] read Produce v7; broker: 2, bytes_read: 0, read_wait: 38.142µs, time_to_read: 537.388µs, err: EOF
INFO 2024-05-02T14:31:13.827965487Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[WARN] read from broker errored, killing connection after 0 successful responses (is SASL missing?); addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2, err: EOF