kafka-go
kafka-go copied to clipboard
Significantly increase memory usage when upgrading from v0.3.5 to v0.4.12
Describe the bug
We have an application using the v0.4.12 and kafka.NewWriter
. It's a small pinger application that instantiates a new kafka.NewWriter
every second, sends a message a receives it. We call this a "probe". We don't reinstantiate a kafka.Reader
on every probe. A probe is executed around once per second.
We started seeing a significant memory usage when we upgraded from v0.3.5 to v0.4.12. This lead our application starting to get OOM restarted by K8s and we had to significantly increase the memory limit for this tiny application.
We are running two instances of the kafka-pinger application. This graph shows the sum of memory usage and sum of memory limits for both pinger when we upgraded the application:
This graph shows the samt graph, but for when we increased the memory limit to 1 GB for each pinger:
As you can see, the pinger memory usage went from ~13.5 MB to ~330 MB.
Kafka Version kafka_2.11-1.1.1 with 22 Kafka brokers.
To Reproduce See description above.
Expected behavior Memory usage being significantly smaller.
Additional context
pprof
heap dump:
(pprof) top
Showing nodes accounting for 87.97MB, 100% of 87.97MB total
Showing top 10 nodes out of 48
flat flat% sum% cum cum%
46.69MB 53.08% 53.08% 46.69MB 53.08% github.com/segmentio/kafka-go.makePartitions
34.95MB 39.73% 92.80% 34.95MB 39.73% reflect.unsafe_NewArray
1.76MB 2.00% 94.81% 1.76MB 2.00% compress/flate.NewWriter
1.06MB 1.21% 96.02% 1.06MB 1.21% github.com/segmentio/kafka-go/protocol.newPage
1MB 1.14% 97.16% 47.70MB 54.22% github.com/segmentio/kafka-go.makeLayout
1MB 1.14% 98.29% 1.50MB 1.71% github.com/segmentio/kafka-go.(*Transport).grabPool
1MB 1.14% 99.43% 1MB 1.14% github.com/segmentio/kafka-go/protocol.bytesToString
0.50MB 0.57% 100% 0.50MB 0.57% github.com/segmentio/kafka-go.(*connPool).newConnGroup (inline)
0 0% 100% 1.76MB 2.00% bufio.(*Writer).Flush
0 0% 100% 1.76MB 2.00% compress/gzip.(*Writer).Write
(pprof) tree
Showing nodes accounting for 87.97MB, 100% of 87.97MB total
----------------------------------------------------------+-------------
flat flat% sum% cum cum% calls calls% + context
----------------------------------------------------------+-------------
46.69MB 100% | github.com/segmentio/kafka-go.makeLayout
46.69MB 53.08% 53.08% 46.69MB 53.08% | github.com/segmentio/kafka-go.makePartitions
----------------------------------------------------------+-------------
34.95MB 100% | reflect.MakeSlice
34.95MB 39.73% 92.80% 34.95MB 39.73% | reflect.unsafe_NewArray
----------------------------------------------------------+-------------
1.76MB 100% | compress/gzip.(*Writer).Write
1.76MB 2.00% 94.81% 1.76MB 2.00% | compress/flate.NewWriter
----------------------------------------------------------+-------------
1.06MB 100% | github.com/segmentio/kafka-go/protocol.(*pageBuffer).newPage
1.06MB 1.21% 96.02% 1.06MB 1.21% | github.com/segmentio/kafka-go/protocol.newPage
----------------------------------------------------------+-------------
47.70MB 100% | github.com/segmentio/kafka-go.(*connPool).update
1MB 1.14% 97.16% 47.70MB 54.22% | github.com/segmentio/kafka-go.makeLayout
46.69MB 97.90% | github.com/segmentio/kafka-go.makePartitions
----------------------------------------------------------+-------------
1.50MB 100% | github.com/segmentio/kafka-go.(*Transport).RoundTrip
1MB 1.14% 98.29% 1.50MB 1.71% | github.com/segmentio/kafka-go.(*Transport).grabPool
0.50MB 33.33% | github.com/segmentio/kafka-go.(*connPool).newConnGroup (inline)
----------------------------------------------------------+-------------
1MB 100% | github.com/segmentio/kafka-go/protocol.(*decoder).readString (inline)
1MB 1.14% 99.43% 1MB 1.14% | github.com/segmentio/kafka-go/protocol.bytesToString
----------------------------------------------------------+-------------
0.50MB 100% | github.com/segmentio/kafka-go.(*Transport).grabPool (inline)
0.50MB 0.57% 100% 0.50MB 0.57% | github.com/segmentio/kafka-go.(*connPool).newConnGroup
----------------------------------------------------------+-------------
1.76MB 100% | github.com/prometheus/common/expfmt.MetricFamilyToText.func1
0 0% 100% 1.76MB 2.00% | bufio.(*Writer).Flush
1.76MB 100% | compress/gzip.(*Writer).Write
----------------------------------------------------------+-------------
1.76MB 100% | bufio.(*Writer).Flush
0 0% 100% 1.76MB 2.00% | compress/gzip.(*Writer).Write
1.76MB 100% | compress/flate.NewWriter
----------------------------------------------------------+-------------
1.76MB 100% | net/http.HandlerFunc.ServeHTTP
0 0% 100% 1.76MB 2.00% | github.com/prometheus/client_golang/prometheus/promhttp.HandlerFor.func1
1.76MB 100% | github.com/prometheus/common/expfmt.encoderCloser.Encode
----------------------------------------------------------+-------------
1.76MB 100% | net/http.HandlerFunc.ServeHTTP
0 0% 100% 1.76MB 2.00% | github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerCounter.func1
1.76MB 100% | net/http.HandlerFunc.ServeHTTP
----------------------------------------------------------+-------------
1.76MB 100% | net/http.HandlerFunc.ServeHTTP
0 0% 100% 1.76MB 2.00% | github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerInFlight.func1
1.76MB 100% | net/http.HandlerFunc.ServeHTTP
----------------------------------------------------------+-------------
1.76MB 100% | github.com/prometheus/common/expfmt.NewEncoder.func7
0 0% 100% 1.76MB 2.00% | github.com/prometheus/common/expfmt.MetricFamilyToText
1.76MB 100% | github.com/prometheus/common/expfmt.MetricFamilyToText.func1
----------------------------------------------------------+-------------
1.76MB 100% | github.com/prometheus/common/expfmt.MetricFamilyToText
0 0% 100% 1.76MB 2.00% | github.com/prometheus/common/expfmt.MetricFamilyToText.func1
1.76MB 100% | bufio.(*Writer).Flush
----------------------------------------------------------+-------------
1.76MB 100% | github.com/prometheus/common/expfmt.encoderCloser.Encode
0 0% 100% 1.76MB 2.00% | github.com/prometheus/common/expfmt.NewEncoder.func7
1.76MB 100% | github.com/prometheus/common/expfmt.MetricFamilyToText
----------------------------------------------------------+-------------
1.76MB 100% | github.com/prometheus/client_golang/prometheus/promhttp.HandlerFor.func1
0 0% 100% 1.76MB 2.00% | github.com/prometheus/common/expfmt.encoderCloser.Encode
1.76MB 100% | github.com/prometheus/common/expfmt.NewEncoder.func7
----------------------------------------------------------+-------------
1.50MB 100% | github.com/segmentio/kafka-go.(*Writer).partitions
0 0% 100% 1.50MB 1.71% | github.com/segmentio/kafka-go.(*Transport).RoundTrip
1.50MB 100% | github.com/segmentio/kafka-go.(*Transport).grabPool
----------------------------------------------------------+-------------
1.50MB 100% | main.(*sender).Send
0 0% 100% 1.50MB 1.71% | github.com/segmentio/kafka-go.(*Writer).WriteMessages
1.50MB 100% | github.com/segmentio/kafka-go.(*Writer).partitions
----------------------------------------------------------+-------------
1.50MB 100% | github.com/segmentio/kafka-go.(*Writer).WriteMessages
0 0% 100% 1.50MB 1.71% | github.com/segmentio/kafka-go.(*Writer).partitions
1.50MB 100% | github.com/segmentio/kafka-go.(*Transport).RoundTrip
----------------------------------------------------------+-------------
36.48MB 100% | github.com/segmentio/kafka-go.(*conn).run
0 0% 100% 36.48MB 41.47% | github.com/segmentio/kafka-go.(*conn).roundTrip
36.48MB 100% | github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip
----------------------------------------------------------+-------------
0 0% 100% 36.48MB 41.47% | github.com/segmentio/kafka-go.(*conn).run
36.48MB 100% | github.com/segmentio/kafka-go.(*conn).roundTrip
----------------------------------------------------------+-------------
0.53MB 100% | github.com/segmentio/kafka-go.(*connGroup).grabConnOrConnect.func1
0 0% 100% 0.53MB 0.6% | github.com/segmentio/kafka-go.(*connGroup).connect
0.53MB 100% | github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip
----------------------------------------------------------+-------------
0 0% 100% 0.53MB 0.6% | github.com/segmentio/kafka-go.(*connGroup).grabConnOrConnect.func1
0.53MB 100% | github.com/segmentio/kafka-go.(*connGroup).connect
----------------------------------------------------------+-------------
0 0% 100% 47.70MB 54.22% | github.com/segmentio/kafka-go.(*connPool).discover
47.70MB 100% | github.com/segmentio/kafka-go.(*connPool).update
----------------------------------------------------------+-------------
47.70MB 100% | github.com/segmentio/kafka-go.(*connPool).discover
0 0% 100% 47.70MB 54.22% | github.com/segmentio/kafka-go.(*connPool).update
47.70MB 100% | github.com/segmentio/kafka-go.makeLayout
----------------------------------------------------------+-------------
36.48MB 98.56% | github.com/segmentio/kafka-go.(*conn).roundTrip
0.53MB 1.44% | github.com/segmentio/kafka-go.(*connGroup).connect
0 0% 100% 37.01MB 42.07% | github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip
37.01MB 100% | github.com/segmentio/kafka-go/protocol.RoundTrip
----------------------------------------------------------+-------------
35.95MB 100% | github.com/segmentio/kafka-go/protocol.arrayDecodeFuncOf.func2
0 0% 100% 35.95MB 40.86% | github.com/segmentio/kafka-go/protocol.(*decoder).decodeArray
35.95MB 100% | github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
34.95MB 97.22% | github.com/segmentio/kafka-go/protocol.makeArray
----------------------------------------------------------+-------------
1MB 100% | github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
0 0% 100% 1MB 1.14% | github.com/segmentio/kafka-go/protocol.(*decoder).decodeString
1MB 100% | github.com/segmentio/kafka-go/protocol.(*decoder).readString
----------------------------------------------------------+-------------
1MB 100% | github.com/segmentio/kafka-go/protocol.(*decoder).decodeString
0 0% 100% 1MB 1.14% | github.com/segmentio/kafka-go/protocol.(*decoder).readString
1MB 100% | github.com/segmentio/kafka-go/protocol.bytesToString (inline)
----------------------------------------------------------+-------------
1.06MB 100% | github.com/segmentio/kafka-go/protocol.(*encoder).writeInt32
0 0% 100% 1.06MB 1.21% | github.com/segmentio/kafka-go/protocol.(*encoder).Write
1.06MB 100% | github.com/segmentio/kafka-go/protocol.(*pageBuffer).Write
----------------------------------------------------------+-------------
1.06MB 100% | github.com/segmentio/kafka-go/protocol.WriteRequest
0 0% 100% 1.06MB 1.21% | github.com/segmentio/kafka-go/protocol.(*encoder).writeInt32
1.06MB 100% | github.com/segmentio/kafka-go/protocol.(*encoder).Write
----------------------------------------------------------+-------------
1.06MB 100% | github.com/segmentio/kafka-go/protocol.(*encoder).Write
0 0% 100% 1.06MB 1.21% | github.com/segmentio/kafka-go/protocol.(*pageBuffer).Write
1.06MB 100% | github.com/segmentio/kafka-go/protocol.(*pageBuffer).newPage (inline)
----------------------------------------------------------+-------------
1.06MB 100% | github.com/segmentio/kafka-go/protocol.(*pageBuffer).Write (inline)
0 0% 100% 1.06MB 1.21% | github.com/segmentio/kafka-go/protocol.(*pageBuffer).newPage
1.06MB 100% | github.com/segmentio/kafka-go/protocol.newPage
----------------------------------------------------------+-------------
35.95MB 100% | github.com/segmentio/kafka-go/protocol.RoundTrip
0 0% 100% 35.95MB 40.86% | github.com/segmentio/kafka-go/protocol.ReadResponse
35.95MB 100% | github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
----------------------------------------------------------+-------------
37.01MB 100% | github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip
0 0% 100% 37.01MB 42.07% | github.com/segmentio/kafka-go/protocol.RoundTrip
35.95MB 97.13% | github.com/segmentio/kafka-go/protocol.ReadResponse
1.06MB 2.87% | github.com/segmentio/kafka-go/protocol.WriteRequest
----------------------------------------------------------+-------------
1.06MB 100% | github.com/segmentio/kafka-go/protocol.RoundTrip
0 0% 100% 1.06MB 1.21% | github.com/segmentio/kafka-go/protocol.WriteRequest
1.06MB 100% | github.com/segmentio/kafka-go/protocol.(*encoder).writeInt32
----------------------------------------------------------+-------------
35.95MB 100% | github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
0 0% 100% 35.95MB 40.86% | github.com/segmentio/kafka-go/protocol.arrayDecodeFuncOf.func2
35.95MB 100% | github.com/segmentio/kafka-go/protocol.(*decoder).decodeArray
----------------------------------------------------------+-------------
34.95MB 100% | github.com/segmentio/kafka-go/protocol.(*decoder).decodeArray
0 0% 100% 34.95MB 39.73% | github.com/segmentio/kafka-go/protocol.makeArray
34.95MB 100% | reflect.MakeSlice
----------------------------------------------------------+-------------
35.95MB 100% | github.com/segmentio/kafka-go/protocol.(*decoder).decodeArray
35.95MB 100% | github.com/segmentio/kafka-go/protocol.ReadResponse
0 0% 100% 35.95MB 40.86% | github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
35.95MB 100% | github.com/segmentio/kafka-go/protocol.arrayDecodeFuncOf.func2
1MB 2.78% | github.com/segmentio/kafka-go/protocol.(*decoder).decodeString
----------------------------------------------------------+-------------
1.50MB 100% | main.runProbe
0 0% 100% 1.50MB 1.71% | main.(*sender).Send
1.50MB 100% | github.com/segmentio/kafka-go.(*Writer).WriteMessages
----------------------------------------------------------+-------------
0 0% 100% 1.50MB 1.71% | main.main.func3
1.50MB 100% | main.runProbe
----------------------------------------------------------+-------------
1.50MB 100% | main.main.func3
0 0% 100% 1.50MB 1.71% | main.runProbe
1.50MB 100% | main.(*sender).Send
----------------------------------------------------------+-------------
1.76MB 100% | net/http.serverHandler.ServeHTTP
0 0% 100% 1.76MB 2.00% | net/http.(*ServeMux).ServeHTTP
1.76MB 100% | net/http.HandlerFunc.ServeHTTP
----------------------------------------------------------+-------------
0 0% 100% 1.76MB 2.00% | net/http.(*conn).serve
1.76MB 100% | net/http.serverHandler.ServeHTTP
----------------------------------------------------------+-------------
1.76MB 100% | github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerCounter.func1
1.76MB 100% | github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerInFlight.func1
1.76MB 100% | net/http.(*ServeMux).ServeHTTP
0 0% 100% 1.76MB 2.00% | net/http.HandlerFunc.ServeHTTP
1.76MB 100% | github.com/prometheus/client_golang/prometheus/promhttp.HandlerFor.func1
1.76MB 100% | github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerCounter.func1
1.76MB 100% | github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerInFlight.func1
----------------------------------------------------------+-------------
1.76MB 100% | net/http.(*conn).serve
0 0% 100% 1.76MB 2.00% | net/http.serverHandler.ServeHTTP
1.76MB 100% | net/http.(*ServeMux).ServeHTTP
----------------------------------------------------------+-------------
34.95MB 100% | github.com/segmentio/kafka-go/protocol.makeArray
0 0% 100% 34.95MB 39.73% | reflect.MakeSlice
34.95MB 100% | reflect.unsafe_NewArray
----------------------------------------------------------+-------------
Hello @JensRantil, thanks for the detailed bug report.
After thoroughly reviewing the code it does appear that we could have a leak in this code. I don't know that SetFinalizer prevents the collection of cycling reference (if you have documentation to share on the topic I'd be excited to read it), but we do employ reference counting in the connection management code, which could likely be a source of such a bug https://github.com/segmentio/kafka-go/blob/master/transport.go#L274-L331
More investigation is needed to identify and address the root cause, but at first sight this does appear to be an issue in kafka-go.
Just out of curiosity, you mentioned that you recreate writers frequently, do these writers get closed properly after being used? And if the program isn't sensitive, would it be possible for you to share it?
I don't know that SetFinalizer prevents the collection of cycling reference (if you have documentation to share on the topic I'd be excited to read it)
Hello! Friday night here but I can at least send you a link to https://golang.org/pkg/runtime/#SetFinalizer which states
If a cyclic structure includes a block with a finalizer, that cycle is not guaranteed to be garbage collected and the finalizer is not guaranteed to run, because there is no ordering that respects the dependencies.
I initially learned about this here: https://groups.google.com/g/golang-nuts/c/tzvgeBEW1WY/m/sQw7m5mAtfMJ?pli=1
I'll check if we close a bit later and will see if I can share some code.
Nice, thanks for sharing! I must have missed this in the docs, or forgot about it 👍
@achille-roussel Hello again! I've now shared the current state of the source code here. I have not done any kind of reduction of the code to try to come with a smallest possible way to recreate the bug. In short, runProbe(...)
runs periodically and it calls Sender.Close()
which then closes the writer.
Side-note: It's a very small useful utility application and we've discussed internally to make this a proper open source application.
Okay, we decided to significantly increase the memory limit and that did the trick. ✨ The issue is not a memory leak. Instead, memory usage has simply significantly increased from ~13.5 MB to ~330 MB. This sounds like a significant memory usage for a very small application sending and receiving one message per second so I still consider this a bug.
I've updated the issue title and description with new graphs showing memory usage.
FYI, I could not find any buffer sizes in https://pkg.go.dev/github.com/segmentio/kafka-go#Writer nor https://pkg.go.dev/github.com/segmentio/kafka-go#Reader that looked crazy. Is there any way to reduce the size of those?