confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

Producer messages stuck flushing even though there weren't any writes.

Open ylhan opened this issue 9 months ago • 4 comments

Description

Scenario:

  1. I have a producer running for 4 days
  2. No writes at all
  3. Calling flush on this producer (timeout=10 seconds) results in 2 unflushed messages

I'm at wits' end here. I didn't write anything using this producer yet it complains that two messages are not flushed.

I dug into the library code a bit and perhaps this could be an issue with the Len() method? Why does this method add up the lengths of 3 different queues? Why do I have unflushed messages when I did not call write on the producer even once?

How to reproduce

Instantiate a kafka producer with the configuration map below, do not write anything, and flush the producer.

Checklist

Please provide the following information:

  • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): v2.3.1-0.20240315214844-f1230c0e9dd4
  • [x] Apache Kafka broker version: 3.2.0
  • [x] Client configuration: ConfigMap{...}:
&kafka.ConfigMap{
   "client.id":         "blah",
   "bootstrap.servers": ..., 
   "sasl.mechanism": "SCRAM-SHA-512",
   "security.protocol": "SASL_SSL",
   "sasl.username": ...,
   "sasl.password": ...,
   "acks": "all",
   "enable.idempotence": "true",
   "max.in.flight.requests.per.connection": 5,
   "linger.ms": 100,
   "retries": 5,
   "batch.size": 2000,
   "request.timeout.ms": 2000,
}
  • [x] Operating system: docker container - base image: golang:1.21-bullseye
  • [x] Provide client logs (with "debug": ".." as necessary) I wrapped confluent kafka's flush like so:
func (p *Producer) Close(context.Context) error {
	defer p.writer.Close()
	unflushed := p.writer.Flush(p.flushTimeoutMs)
	if unflushed > 0 {
		p.log.Error("Failed to flush messages", zap.Int("unflushed", unflushed))
		return errors.Errorf("failed to flush %d messages", unflushed)
	}
	return nil
}

When the client shutdown, I get the following error:

failed to flush 2 messages
  • [x] Provide broker log excerpts: attached. There are multiple services connected to the broker but service is called market-connector. Nothing very telling in the logs from my review.
  • [x] Critical issue: No

extract-2024-05-20T21_40_38.964Z.csv

ylhan avatar May 20 '24 21:05 ylhan