confluent-kafka-go
confluent-kafka-go copied to clipboard
Producer.Close() does not free underlying memory.
Description
long story short i was trying to overcome issues which the kafka client encounter, by closing old producers and creating new ones, when i ran my testing for the code, i found out that running producer.Close() will not free the memory previously allocated, i've created a small benchmarking test which basically create producer, producer single message and closed the producer, the 22MB of allocated memory would not get free, what am i missing?
How to reproduce
run the following test and notice that the MemUse is 24087376
package main
import ( "crypto/rand" "github.com/Sentinel-One/dv-gateway/handler" "github.com/Sentinel-One/dv-gateway/util" "github.com/confluentinc/confluent-kafka-go/kafka" "runtime" "testing" "time" )
func BenchmarkProducerOpenCloseMemUse(b *testing.B) { log := util.NewLogger() b.ReportAllocs() b.ResetTimer()
c := 300
buffer := make([]byte, c)
_, err := rand.Read(buffer)
if err != nil {
log.Error("error:", err)
return
}
configMap := handler.CreateKafkaConfig()
configMap["bootstrap.servers"] = "creds.Broker"
configMap["sasl.username"] = "creds.UserName"
configMap["sasl.password"] = "creds.Password"
configMap["linger.ms"] = 35
configMap["acks"] = 1
kafkaConfig := handler.ConvertKafkaMapToKafkaConfig(configMap)
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: Topic, Partition: kafka.PartitionAny},
Value: buffer,
Timestamp: time.Now(),
}
producer,_ := handler.CreateKafkaProducer(kafkaConfig)
deliveryChan := make(chan kafka.Event)
producer.Produce(message, deliveryChan)
select {
case e := <-deliveryChan:
_ = e.(*kafka.Message)
}
close(deliveryChan)
log.Info("producer", "name",producer.String())
producer.Close()
runtime.GC()
b.StopTimer()
}
Checklist
Please provide the following information:
- [ ] confluent-kafka-go(v1.4.2) and librdkafka from where do i get it?
Have you tried running a memory profiler on this testcase, e.g. https://www.freecodecamp.org/news/how-i-investigated-memory-leaks-in-go-using-pprof-on-a-large-codebase-4bec4325e192/ ?
@edenhill no i have not, i can try that, i ran the memory profiler from the goland:
BenchmarkGatewayMemUse-16 1 2602185561 ns/op 24087376 B/op 324 allocs/op
PASS
I wonder how the pprof would help me, the code does nothing except creating producer and message and sending a single message and closing the producer, i assume there are underlying allocations which are not freed, even if ill see who are they, how will i close them?
It is too soon to say how to fix the memory issue before knowing to whom the memory belongs, so start off with pprof or similar.
ok thanks, ill try and move it forward, but since it's not currently blocking me(i can live at the moment with the leak of 22MB per producer) i'm not sure ill be pushing this ticket, so feel free to close it for inactivity in case i won't show around in couple of days and max ill reopen it with needed info.
Hi @denis-rusakov, if you still met memory leak issue, it's probably a golang issue, please try to run your code with GODEBUG=madvdontneed=1
I used this example https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/producer_example/producer_example.go in my application, I tried to use 9 goroutines to produce data that each goroutine has one producer object and make sure to close the original producer before creating a new producer, but it still has memory leaks. Memory increases by about 500M every time I close one producer and create a new one.
Regardless of the memory issue, we highly recommend reusing the same Producer instance across multiple go-routines. There's typically no reason to instantiate multiple Producers unless you need different configuration.
Thanks for your reply. But I used transactional producer, can the same transactional producer work normally cross multiple go-routines? According to my test, the answer is no. All goroutines get stuck at BeginTransactions().