franz-go icon indicating copy to clipboard operation
franz-go copied to clipboard

Producer can not write to broker nr. 2

Open ErlendFax opened this issue 2 months ago • 8 comments

We are running redpanda self-hosted with three brokers.

Our app consume from 1 topic, process messages, and produce to 3 topics. Consuming and processing takes no times, but producing back to topics works OK for a few messages, but then fails. It's possible I've gotten a bit confused here, but to me it looks like this specifically happens when writing to broker nr. 2.

We have a few other apps connected to the cluster, they all run fine. Some of them in go.

I am not sure what happens here, just reaching out to see if anyone got suggestions.

I tried to fiddle around with produce config, buffer sizes, etc. but no luck. Is my config wrong? Any ideas?

  • Go. 1.19
  • franz-go: 1.16.1
Snippets
...
configConsumer := []kgo.Opt{
  kgo.SeedBrokers(config.Redpanda.BrokerUrl),
  kgo.ConsumeTopics(config.Redpanda.ConsumerTopic),
  kgo.ConsumerGroup(config.Redpanda.GroupId),
  kgo.ClientID("lte-m-csv-parser-consumer"),
  kgo.Balancers(kgo.RoundRobinBalancer()),
  kgo.WithLogger(loggerConsumer),
}
consumerClient, err := kgo.NewClient(configConsumer...)
if err != nil {
  log.Fatalf("Failed to connect to Redpanda: %v", err)
}
defer consumerClient.Close()

  configProducer := []kgo.Opt{
    kgo.SeedBrokers(config.Redpanda.BrokerUrl),
    kgo.MaxBufferedBytes(10 * 1024), // 10 KB maximum buffer size
    kgo.ProducerBatchCompression(kgo.NoCompression()),
    kgo.WithLogger(loggerProducer),
    kgo.ClientID("lte-m-csv-parser-producer"),
  }
  producerClient, err := kgo.NewClient(configProducer...)
  if err != nil {
   log.Fatalf("Failed to connect to Redpanda: %v", err)
  }
  defer producerClient.Close()

  ...
...
// both of the below snippets are started in separate goroutines in main.

for {
  fetches := service.consumerClient.PollRecords(service.ctx, 100)

  fetches.EachError(func(topic string, partition int32, err error) {
    log.Printf("Error fetching from topic %s partition %d: %v", topic, partition, err)
  })

  fetches.EachRecord(func(record *kgo.Record) {
	  // Process record
	  messages := service.parseMessage(record.Value)
	  for _, message := range messages {
		  // send message to channel
		  service.channel <- message
		  messageCount++
	  }
      })
    }
}
...
// read from channel and produce to redpanda
  for message := range service.channel {
    service.producerClient.Produce(service.ctx, &message, callbackProducer)
  }
Logs
INFO 2024-05-02T14:31:13.563229821Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] wrote Produce v7; broker: 2, bytes_written: 1039, write_wait: 97.652µs, time_to_write: 79.068µs, err: <nil>
INFO 2024-05-02T14:31:13.566255149Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] read Produce v7; broker: 2, bytes_read: 0, read_wait: 88.88µs, time_to_read: 1.92081ms, err: EOF
INFO 2024-05-02T14:31:13.566264884Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] read from broker errored, killing connection; addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2, successful_reads: 1, err: EOF
INFO 2024-05-02T14:31:13.566271592Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] retry batches processed; wanted_metadata_update: false, triggering_metadata_update: false, should_backoff: false
INFO 2024-05-02T14:31:13.820934368Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] opening connection to broker; addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2
INFO 2024-05-02T14:31:13.825469439Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] connection opened to broker; addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2
INFO 2024-05-02T14:31:13.827150889Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] connection initialized successfully; addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2
INFO 2024-05-02T14:31:13.827367931Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] wrote Produce v7; broker: 2, bytes_written: 1039, write_wait: 5.907426ms, time_to_write: 42.369µs, err: <nil>
INFO 2024-05-02T14:31:13.827706811Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] read Produce v7; broker: 2, bytes_read: 0, read_wait: 38.142µs, time_to_read: 537.388µs, err: EOF
INFO 2024-05-02T14:31:13.827965487Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[WARN] read from broker errored, killing connection after 0 successful responses (is SASL missing?); addr: redpanda-2.redpanda.redpanda.svc.cluster.local.:9093, broker: 2, err: EOF

ErlendFax avatar May 02 '24 16:05 ErlendFax