confluent-kafka-go
confluent-kafka-go copied to clipboard
Memory leak while producing message?
Description
I'm trying to build API server that send to Kafka message queue when it receive request with HTTP Call. The servers are running on the Kubernetes. Whenever it runs over few hours(like 9+ hours), it shows me OOMKilled.
This Picture shows the heap dump right before OOMKilled occurred. Seems like _Cfunc_GoString is too high and didn't free at all.
This picture shows the diff_base between two heap dumps, which were right after server started and right before OOMKilled occurred.
How to reproduce
func (p *DClient) InitProducer() error {
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": p.BoostrapServers,
"retries": 2147483647,
"acks": "all",
"batch.size": 65536,
"linger.ms": 1000,
"compression.type": "lz4",
})
if err != nil {
logger.Error("Failed to get producer", zap.Error(err))
return err
}
p.Producer = producer
return nil
}
func (p *DClient) ProduceMessage(msg []byte, flushDelay int) error {
deliveryChan := make(chan kafka.Event)
go func() {
for e := range deliveryChan {
switch ev := e.(type) {
case *kafka.Message:
m := ev
if m.TopicPartition.Error != nil {
logger.Warn("Failed to deliver message", zap.Error(m.TopicPartition.Error))
} else {
logger.Debug(fmt.Sprintf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset))
}
default:
// Ignore default event
}
close(deliveryChan)
}
}()
err := p.Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &p.Topic, Partition: kafka.PartitionAny},
Value: msg,
}, deliveryChan)
if err != nil {
close(deliveryChan)
logger.Error("Failed to produce message", zap.Error(err))
return err
}
for p.Producer.Flush(flushDelay) > 0 {
// Wait for message deliveries before shutting down
}
return nil
}
// From another file, run below code when received HTTP requests
go func() {
err := p.ProduceMessage(msg, 100)
if err != nil {
logger.Warn("Failed to put Kafka message", zap.Error(err))
}
}()
I thought it would be solved after adding delivery channel, but it wasn't. Did I use it wrong?
Checklist
Please provide the following information:
- [x] confluent-kafka-go and librdkafka version:
LibraryVersion(v1.8.2)
- [ ] Apache Kafka broker version: Can't see in my side.
- [x] Client configuration:
&kafka.ConfigMap{ "bootstrap.servers": p.BoostrapServers, "retries": 2147483647, "acks": "all", "batch.size": 65536, "linger.ms": 1000, "compression.type": "lz4", }
- [x] Operating system: CentOS
- [ ] Provide client logs (with
"debug": ".."
as necessary) - [ ] Provide broker log excerpts
- [x] Critical issue : yes
@devFallingstar Have you maybe checked if the same problem occurs with the latest version?
@pluschnikow No, but I found a way to solve the problems just right now.
Since ProduceMessage() function called by multiple goroutines, Flush() doesn't work as I expected. After I removed Flush function calls (Thought it would be okay because I made producer not to close itself while server is running), memory usage is seems to be okay. Does this problem will be solved at the latest version?
Seeing same issue, GOString keeps accumulating not getting cleaned up. Any pointers on what could be the problem(removing flush doesnot seem to solve). Highlighted one keeps growing:
Showing nodes accounting for 373.51MB, 98.71% of 378.37MB total Dropped 30 nodes (cum <= 1.89MB) Showing top 10 nodes out of 12 flat flat% sum% cum cum% 203.08MB 53.67% 53.67% 203.08MB 53.67% github.com/confluentinc/confluent-kafka-go/kafka._Cfunc_GoString (inline) 160.29MB 42.36% 96.03% 160.29MB 42.36% github.com/confluentinc/confluent-kafka-go/kafka.NewProducer 9.64MB 2.55% 98.58% 147.03MB 38.86% logproc/kfreader.PublishToMsgbus.func2 0.50MB 0.13% 98.71% 203.58MB 53.80% github.com/confluentinc/confluent-kafka-go/kafka.(*handle).eventPoll 0 0% 98.71% 203.58MB 53.80% github.com/confluentinc/confluent-kafka-go/kafka.NewProducer.func5 0 0% 98.71% 203.58MB 53.80% github.com/confluentinc/confluent-kafka-go/kafka.poller 0 0% 98.71% 160.29MB 42.36% logproc/kafka.CreateProducer 0 0% 98.71% 22.90MB 6.05% logproc/kafka.ListTopics 0 0% 98.71% 149.76MB 39.58% logproc/kfreader.PublishToMsgbus 0 0% 98.71% 2.74MB 0.72% logproc/kfreader.PublishToMsgbus.func1
Upgrading to 2.2 version resolved this.
Hi, even if you're using a custom delivery report channel you should still poll from the Events channel for other events https://github.com/confluentinc/confluent-kafka-go/blob/d194ad14d58e17147c529ac4715850ca0a62134c/examples/producer_example/producer_example.go#L51