kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Significantly increase memory usage when upgrading from v0.3.5 to v0.4.12

Open JensRantil opened this issue 3 years ago • 6 comments

Describe the bug We have an application using the v0.4.12 and kafka.NewWriter. It's a small pinger application that instantiates a new kafka.NewWriter every second, sends a message a receives it. We call this a "probe". We don't reinstantiate a kafka.Reader on every probe. A probe is executed around once per second.

We started seeing a significant memory usage when we upgraded from v0.3.5 to v0.4.12. This lead our application starting to get OOM restarted by K8s and we had to significantly increase the memory limit for this tiny application.

We are running two instances of the kafka-pinger application. This graph shows the sum of memory usage and sum of memory limits for both pinger when we upgraded the application:

image

This graph shows the samt graph, but for when we increased the memory limit to 1 GB for each pinger:

image

As you can see, the pinger memory usage went from ~13.5 MB to ~330 MB.

Kafka Version kafka_2.11-1.1.1 with 22 Kafka brokers.

To Reproduce See description above.

Expected behavior Memory usage being significantly smaller.

Additional context pprof heap dump:

(pprof) top
Showing nodes accounting for 87.97MB, 100% of 87.97MB total
Showing top 10 nodes out of 48
      flat  flat%   sum%        cum   cum%
   46.69MB 53.08% 53.08%    46.69MB 53.08%  github.com/segmentio/kafka-go.makePartitions
   34.95MB 39.73% 92.80%    34.95MB 39.73%  reflect.unsafe_NewArray
    1.76MB  2.00% 94.81%     1.76MB  2.00%  compress/flate.NewWriter
    1.06MB  1.21% 96.02%     1.06MB  1.21%  github.com/segmentio/kafka-go/protocol.newPage
       1MB  1.14% 97.16%    47.70MB 54.22%  github.com/segmentio/kafka-go.makeLayout
       1MB  1.14% 98.29%     1.50MB  1.71%  github.com/segmentio/kafka-go.(*Transport).grabPool
       1MB  1.14% 99.43%        1MB  1.14%  github.com/segmentio/kafka-go/protocol.bytesToString
    0.50MB  0.57%   100%     0.50MB  0.57%  github.com/segmentio/kafka-go.(*connPool).newConnGroup (inline)
         0     0%   100%     1.76MB  2.00%  bufio.(*Writer).Flush
         0     0%   100%     1.76MB  2.00%  compress/gzip.(*Writer).Write
(pprof) tree
Showing nodes accounting for 87.97MB, 100% of 87.97MB total
----------------------------------------------------------+-------------
      flat  flat%   sum%        cum   cum%   calls calls% + context
----------------------------------------------------------+-------------
                                           46.69MB   100% |   github.com/segmentio/kafka-go.makeLayout
   46.69MB 53.08% 53.08%    46.69MB 53.08%                | github.com/segmentio/kafka-go.makePartitions
----------------------------------------------------------+-------------
                                           34.95MB   100% |   reflect.MakeSlice
   34.95MB 39.73% 92.80%    34.95MB 39.73%                | reflect.unsafe_NewArray
----------------------------------------------------------+-------------
                                            1.76MB   100% |   compress/gzip.(*Writer).Write
    1.76MB  2.00% 94.81%     1.76MB  2.00%                | compress/flate.NewWriter
----------------------------------------------------------+-------------
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.(*pageBuffer).newPage
    1.06MB  1.21% 96.02%     1.06MB  1.21%                | github.com/segmentio/kafka-go/protocol.newPage
----------------------------------------------------------+-------------
                                           47.70MB   100% |   github.com/segmentio/kafka-go.(*connPool).update
       1MB  1.14% 97.16%    47.70MB 54.22%                | github.com/segmentio/kafka-go.makeLayout
                                           46.69MB 97.90% |   github.com/segmentio/kafka-go.makePartitions
----------------------------------------------------------+-------------
                                            1.50MB   100% |   github.com/segmentio/kafka-go.(*Transport).RoundTrip
       1MB  1.14% 98.29%     1.50MB  1.71%                | github.com/segmentio/kafka-go.(*Transport).grabPool
                                            0.50MB 33.33% |   github.com/segmentio/kafka-go.(*connPool).newConnGroup (inline)
----------------------------------------------------------+-------------
                                               1MB   100% |   github.com/segmentio/kafka-go/protocol.(*decoder).readString (inline)
       1MB  1.14% 99.43%        1MB  1.14%                | github.com/segmentio/kafka-go/protocol.bytesToString
----------------------------------------------------------+-------------
                                            0.50MB   100% |   github.com/segmentio/kafka-go.(*Transport).grabPool (inline)
    0.50MB  0.57%   100%     0.50MB  0.57%                | github.com/segmentio/kafka-go.(*connPool).newConnGroup
----------------------------------------------------------+-------------
                                            1.76MB   100% |   github.com/prometheus/common/expfmt.MetricFamilyToText.func1
         0     0%   100%     1.76MB  2.00%                | bufio.(*Writer).Flush
                                            1.76MB   100% |   compress/gzip.(*Writer).Write
----------------------------------------------------------+-------------
                                            1.76MB   100% |   bufio.(*Writer).Flush
         0     0%   100%     1.76MB  2.00%                | compress/gzip.(*Writer).Write
                                            1.76MB   100% |   compress/flate.NewWriter
----------------------------------------------------------+-------------
                                            1.76MB   100% |   net/http.HandlerFunc.ServeHTTP
         0     0%   100%     1.76MB  2.00%                | github.com/prometheus/client_golang/prometheus/promhttp.HandlerFor.func1
                                            1.76MB   100% |   github.com/prometheus/common/expfmt.encoderCloser.Encode
----------------------------------------------------------+-------------
                                            1.76MB   100% |   net/http.HandlerFunc.ServeHTTP
         0     0%   100%     1.76MB  2.00%                | github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerCounter.func1
                                            1.76MB   100% |   net/http.HandlerFunc.ServeHTTP
----------------------------------------------------------+-------------
                                            1.76MB   100% |   net/http.HandlerFunc.ServeHTTP
         0     0%   100%     1.76MB  2.00%                | github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerInFlight.func1
                                            1.76MB   100% |   net/http.HandlerFunc.ServeHTTP
----------------------------------------------------------+-------------
                                            1.76MB   100% |   github.com/prometheus/common/expfmt.NewEncoder.func7
         0     0%   100%     1.76MB  2.00%                | github.com/prometheus/common/expfmt.MetricFamilyToText
                                            1.76MB   100% |   github.com/prometheus/common/expfmt.MetricFamilyToText.func1
----------------------------------------------------------+-------------
                                            1.76MB   100% |   github.com/prometheus/common/expfmt.MetricFamilyToText
         0     0%   100%     1.76MB  2.00%                | github.com/prometheus/common/expfmt.MetricFamilyToText.func1
                                            1.76MB   100% |   bufio.(*Writer).Flush
----------------------------------------------------------+-------------
                                            1.76MB   100% |   github.com/prometheus/common/expfmt.encoderCloser.Encode
         0     0%   100%     1.76MB  2.00%                | github.com/prometheus/common/expfmt.NewEncoder.func7
                                            1.76MB   100% |   github.com/prometheus/common/expfmt.MetricFamilyToText
----------------------------------------------------------+-------------
                                            1.76MB   100% |   github.com/prometheus/client_golang/prometheus/promhttp.HandlerFor.func1
         0     0%   100%     1.76MB  2.00%                | github.com/prometheus/common/expfmt.encoderCloser.Encode
                                            1.76MB   100% |   github.com/prometheus/common/expfmt.NewEncoder.func7
----------------------------------------------------------+-------------
                                            1.50MB   100% |   github.com/segmentio/kafka-go.(*Writer).partitions
         0     0%   100%     1.50MB  1.71%                | github.com/segmentio/kafka-go.(*Transport).RoundTrip
                                            1.50MB   100% |   github.com/segmentio/kafka-go.(*Transport).grabPool
----------------------------------------------------------+-------------
                                            1.50MB   100% |   main.(*sender).Send
         0     0%   100%     1.50MB  1.71%                | github.com/segmentio/kafka-go.(*Writer).WriteMessages
                                            1.50MB   100% |   github.com/segmentio/kafka-go.(*Writer).partitions
----------------------------------------------------------+-------------
                                            1.50MB   100% |   github.com/segmentio/kafka-go.(*Writer).WriteMessages
         0     0%   100%     1.50MB  1.71%                | github.com/segmentio/kafka-go.(*Writer).partitions
                                            1.50MB   100% |   github.com/segmentio/kafka-go.(*Transport).RoundTrip
----------------------------------------------------------+-------------
                                           36.48MB   100% |   github.com/segmentio/kafka-go.(*conn).run
         0     0%   100%    36.48MB 41.47%                | github.com/segmentio/kafka-go.(*conn).roundTrip
                                           36.48MB   100% |   github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip
----------------------------------------------------------+-------------
         0     0%   100%    36.48MB 41.47%                | github.com/segmentio/kafka-go.(*conn).run
                                           36.48MB   100% |   github.com/segmentio/kafka-go.(*conn).roundTrip
----------------------------------------------------------+-------------
                                            0.53MB   100% |   github.com/segmentio/kafka-go.(*connGroup).grabConnOrConnect.func1
         0     0%   100%     0.53MB   0.6%                | github.com/segmentio/kafka-go.(*connGroup).connect
                                            0.53MB   100% |   github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip
----------------------------------------------------------+-------------
         0     0%   100%     0.53MB   0.6%                | github.com/segmentio/kafka-go.(*connGroup).grabConnOrConnect.func1
                                            0.53MB   100% |   github.com/segmentio/kafka-go.(*connGroup).connect
----------------------------------------------------------+-------------
         0     0%   100%    47.70MB 54.22%                | github.com/segmentio/kafka-go.(*connPool).discover
                                           47.70MB   100% |   github.com/segmentio/kafka-go.(*connPool).update
----------------------------------------------------------+-------------
                                           47.70MB   100% |   github.com/segmentio/kafka-go.(*connPool).discover
         0     0%   100%    47.70MB 54.22%                | github.com/segmentio/kafka-go.(*connPool).update
                                           47.70MB   100% |   github.com/segmentio/kafka-go.makeLayout
----------------------------------------------------------+-------------
                                           36.48MB 98.56% |   github.com/segmentio/kafka-go.(*conn).roundTrip
                                            0.53MB  1.44% |   github.com/segmentio/kafka-go.(*connGroup).connect
         0     0%   100%    37.01MB 42.07%                | github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip
                                           37.01MB   100% |   github.com/segmentio/kafka-go/protocol.RoundTrip
----------------------------------------------------------+-------------
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.arrayDecodeFuncOf.func2
         0     0%   100%    35.95MB 40.86%                | github.com/segmentio/kafka-go/protocol.(*decoder).decodeArray
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
                                           34.95MB 97.22% |   github.com/segmentio/kafka-go/protocol.makeArray
----------------------------------------------------------+-------------
                                               1MB   100% |   github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
         0     0%   100%        1MB  1.14%                | github.com/segmentio/kafka-go/protocol.(*decoder).decodeString
                                               1MB   100% |   github.com/segmentio/kafka-go/protocol.(*decoder).readString
----------------------------------------------------------+-------------
                                               1MB   100% |   github.com/segmentio/kafka-go/protocol.(*decoder).decodeString
         0     0%   100%        1MB  1.14%                | github.com/segmentio/kafka-go/protocol.(*decoder).readString
                                               1MB   100% |   github.com/segmentio/kafka-go/protocol.bytesToString (inline)
----------------------------------------------------------+-------------
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.(*encoder).writeInt32
         0     0%   100%     1.06MB  1.21%                | github.com/segmentio/kafka-go/protocol.(*encoder).Write
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.(*pageBuffer).Write
----------------------------------------------------------+-------------
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.WriteRequest
         0     0%   100%     1.06MB  1.21%                | github.com/segmentio/kafka-go/protocol.(*encoder).writeInt32
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.(*encoder).Write
----------------------------------------------------------+-------------
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.(*encoder).Write
         0     0%   100%     1.06MB  1.21%                | github.com/segmentio/kafka-go/protocol.(*pageBuffer).Write
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.(*pageBuffer).newPage (inline)
----------------------------------------------------------+-------------
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.(*pageBuffer).Write (inline)
         0     0%   100%     1.06MB  1.21%                | github.com/segmentio/kafka-go/protocol.(*pageBuffer).newPage
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.newPage
----------------------------------------------------------+-------------
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.RoundTrip
         0     0%   100%    35.95MB 40.86%                | github.com/segmentio/kafka-go/protocol.ReadResponse
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
----------------------------------------------------------+-------------
                                           37.01MB   100% |   github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip
         0     0%   100%    37.01MB 42.07%                | github.com/segmentio/kafka-go/protocol.RoundTrip
                                           35.95MB 97.13% |   github.com/segmentio/kafka-go/protocol.ReadResponse
                                            1.06MB  2.87% |   github.com/segmentio/kafka-go/protocol.WriteRequest
----------------------------------------------------------+-------------
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.RoundTrip
         0     0%   100%     1.06MB  1.21%                | github.com/segmentio/kafka-go/protocol.WriteRequest
                                            1.06MB   100% |   github.com/segmentio/kafka-go/protocol.(*encoder).writeInt32
----------------------------------------------------------+-------------
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
         0     0%   100%    35.95MB 40.86%                | github.com/segmentio/kafka-go/protocol.arrayDecodeFuncOf.func2
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.(*decoder).decodeArray
----------------------------------------------------------+-------------
                                           34.95MB   100% |   github.com/segmentio/kafka-go/protocol.(*decoder).decodeArray
         0     0%   100%    34.95MB 39.73%                | github.com/segmentio/kafka-go/protocol.makeArray
                                           34.95MB   100% |   reflect.MakeSlice
----------------------------------------------------------+-------------
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.(*decoder).decodeArray
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.ReadResponse
         0     0%   100%    35.95MB 40.86%                | github.com/segmentio/kafka-go/protocol.structDecodeFuncOf.func2
                                           35.95MB   100% |   github.com/segmentio/kafka-go/protocol.arrayDecodeFuncOf.func2
                                               1MB  2.78% |   github.com/segmentio/kafka-go/protocol.(*decoder).decodeString
----------------------------------------------------------+-------------
                                            1.50MB   100% |   main.runProbe
         0     0%   100%     1.50MB  1.71%                | main.(*sender).Send
                                            1.50MB   100% |   github.com/segmentio/kafka-go.(*Writer).WriteMessages
----------------------------------------------------------+-------------
         0     0%   100%     1.50MB  1.71%                | main.main.func3
                                            1.50MB   100% |   main.runProbe
----------------------------------------------------------+-------------
                                            1.50MB   100% |   main.main.func3
         0     0%   100%     1.50MB  1.71%                | main.runProbe
                                            1.50MB   100% |   main.(*sender).Send
----------------------------------------------------------+-------------
                                            1.76MB   100% |   net/http.serverHandler.ServeHTTP
         0     0%   100%     1.76MB  2.00%                | net/http.(*ServeMux).ServeHTTP
                                            1.76MB   100% |   net/http.HandlerFunc.ServeHTTP
----------------------------------------------------------+-------------
         0     0%   100%     1.76MB  2.00%                | net/http.(*conn).serve
                                            1.76MB   100% |   net/http.serverHandler.ServeHTTP
----------------------------------------------------------+-------------
                                            1.76MB   100% |   github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerCounter.func1
                                            1.76MB   100% |   github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerInFlight.func1
                                            1.76MB   100% |   net/http.(*ServeMux).ServeHTTP
         0     0%   100%     1.76MB  2.00%                | net/http.HandlerFunc.ServeHTTP
                                            1.76MB   100% |   github.com/prometheus/client_golang/prometheus/promhttp.HandlerFor.func1
                                            1.76MB   100% |   github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerCounter.func1
                                            1.76MB   100% |   github.com/prometheus/client_golang/prometheus/promhttp.InstrumentHandlerInFlight.func1
----------------------------------------------------------+-------------
                                            1.76MB   100% |   net/http.(*conn).serve
         0     0%   100%     1.76MB  2.00%                | net/http.serverHandler.ServeHTTP
                                            1.76MB   100% |   net/http.(*ServeMux).ServeHTTP
----------------------------------------------------------+-------------
                                           34.95MB   100% |   github.com/segmentio/kafka-go/protocol.makeArray
         0     0%   100%    34.95MB 39.73%                | reflect.MakeSlice
                                           34.95MB   100% |   reflect.unsafe_NewArray
----------------------------------------------------------+-------------

JensRantil avatar Apr 15 '21 15:04 JensRantil

Hello @JensRantil, thanks for the detailed bug report.

After thoroughly reviewing the code it does appear that we could have a leak in this code. I don't know that SetFinalizer prevents the collection of cycling reference (if you have documentation to share on the topic I'd be excited to read it), but we do employ reference counting in the connection management code, which could likely be a source of such a bug https://github.com/segmentio/kafka-go/blob/master/transport.go#L274-L331

More investigation is needed to identify and address the root cause, but at first sight this does appear to be an issue in kafka-go.

Just out of curiosity, you mentioned that you recreate writers frequently, do these writers get closed properly after being used? And if the program isn't sensitive, would it be possible for you to share it?

achille-roussel avatar Apr 16 '21 18:04 achille-roussel

I don't know that SetFinalizer prevents the collection of cycling reference (if you have documentation to share on the topic I'd be excited to read it)

Hello! Friday night here but I can at least send you a link to https://golang.org/pkg/runtime/#SetFinalizer which states

If a cyclic structure includes a block with a finalizer, that cycle is not guaranteed to be garbage collected and the finalizer is not guaranteed to run, because there is no ordering that respects the dependencies.

I initially learned about this here: https://groups.google.com/g/golang-nuts/c/tzvgeBEW1WY/m/sQw7m5mAtfMJ?pli=1

I'll check if we close a bit later and will see if I can share some code.

JensRantil avatar Apr 16 '21 21:04 JensRantil

Nice, thanks for sharing! I must have missed this in the docs, or forgot about it 👍

achille-roussel avatar Apr 16 '21 21:04 achille-roussel

@achille-roussel Hello again! I've now shared the current state of the source code here. I have not done any kind of reduction of the code to try to come with a smallest possible way to recreate the bug. In short, runProbe(...) runs periodically and it calls Sender.Close() which then closes the writer.

Side-note: It's a very small useful utility application and we've discussed internally to make this a proper open source application.

JensRantil avatar Apr 18 '21 08:04 JensRantil

Okay, we decided to significantly increase the memory limit and that did the trick. ✨ The issue is not a memory leak. Instead, memory usage has simply significantly increased from ~13.5 MB to ~330 MB. This sounds like a significant memory usage for a very small application sending and receiving one message per second so I still consider this a bug.

I've updated the issue title and description with new graphs showing memory usage.

JensRantil avatar Apr 27 '21 08:04 JensRantil

FYI, I could not find any buffer sizes in https://pkg.go.dev/github.com/segmentio/kafka-go#Writer nor https://pkg.go.dev/github.com/segmentio/kafka-go#Reader that looked crazy. Is there any way to reduce the size of those?

JensRantil avatar Apr 27 '21 11:04 JensRantil