confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Memory leak when reconnecting to a broker

Open DobrovAlexey opened this issue 3 years ago • 1 comments

Hi, Faced an unmanaged memory leak in Confluent.Kafka

Confluent.Kafka nuget version 1.8.1 Apache Kafka version 2.13-2.8.0

Client configuration:

"ConsumerConfig": {
    "BootstrapServers": "",
    "GroupId": "MessageDelayer-Group",
    "EnableAutoCommit": false,
    "AutoOffsetReset": 1,
    "EnablePartitionEof": true,
    "PartitionAssignmentStrategy": 2,
    "StatisticsIntervalMs": 5000,
    "SessionTimeoutMs": 6000
  }

Reproduce the problem:

  1. The handler application reads messages and processes them
  2. We stop the broker (simulate a fall)
  3. The app is trying to reconnect, but after half an hour it stops doing it, and memory begins to grow (The consumed memory in 12 hours grows by about 1 gigabyte)
  4. Turn on the broker, but our application no longer tries to connect and hangs

There are potentially two problems here:

  1. Main and most important one is a memory leak
  2. Connection restoration problem (possibly solved by correct client configuration settings)

Application log:

2021-12-09T17:58:30.5454191+03:00 [INF] (MessageDelayer/V0001EX00002871/7) Delivered message to topic hub.22ad8994-b176-474f-9cc7-e7ac7555f600, partition [0], offset 346659.
2021-12-09T17:58:30.5489565+03:00 [INF] (MessageDelayer/V0001EX00002871/7) rdkafka-ed85977e-a0b6-491c-a7ce-121124ca82e3: [event_delay.1h [0] @482624] Successfully committed
2021-12-09T17:58:30.5866259+03:00 [INF] (MessageDelayer/V0001EX00002871/7) rdkafka-ed85977e-a0b6-491c-a7ce-121124ca82e3: [event_delay.1h [0] @482625] Received message
2021-12-09T17:58:30.7003588+03:00 [INF] (MessageDelayer/V0001EX00002871/7) Delivered message to topic hub.22ad8994-b176-474f-9cc7-e7ac7555f600, partition [0], offset 346660.
2021-12-09T17:58:30.7033923+03:00 [INF] (MessageDelayer/V0001EX00002871/7) rdkafka-ed85977e-a0b6-491c-a7ce-121124ca82e3: [event_delay.1h [0] @482625] Successfully committed
2021-12-09T17:58:30.7382826+03:00 [INF] (MessageDelayer/V0001EX00002871/7) rdkafka-ed85977e-a0b6-491c-a7ce-121124ca82e3: [event_delay.1h [0] @482626] Received message
%6|1639061910.905|FAIL|rdkafka#producer-2| [thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Disconnected (after 5683066ms in state UP)
%3|1639061910.905|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: 10.10.10.111:9092/1: Disconnected (after 5683066ms in state UP)
2021-12-09T17:58:30.9140565+03:00 [INF] (MessageDelayer/V0001EX00002871/24) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Info', message: '[thrd:GroupCoordinator]: GroupCoordinator: 10.10.10.111:9092: Disconnected (after 257701ms in state UP)'.
2021-12-09T17:58:30.9141665+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Info', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Disconnected (after 251640ms in state UP)'.
%3|1639061913.102|FAIL|rdkafka#producer-2| [thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT)
%3|1639061913.102|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT)
2021-12-09T17:58:33.1181180+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2200ms in state CONNECT)'.
2021-12-09T17:58:35.6163051+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2188ms in state CONNECT, 1 identical error(s) suppressed)'.
2021-12-09T17:59:09.8213358+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2195ms in state CONNECT, 4 identical error(s) suppressed)'.
2021-12-09T17:59:46.2803743+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2191ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:00:22.7642990+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2187ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:00:57.0782124+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2187ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:01:33.0375968+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:02:09.1867518+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2189ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:02:42.2621110+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2190ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:03:11.9185224+03:00 [INF] (MessageDelayer/V0001EX00002871/16) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Warning', message: '[thrd:main]: Consumer group session timed out (in join-state steady) after 12614 ms without a successful response from the group coordinator (broker 2, last error was Success): revoking assignment and rejoining group'.
2021-12-09T18:03:14.1057305+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT, 2 identical error(s) suppressed)'.
2021-12-09T18:03:30.7467865+03:00 [INF] (MessageDelayer/V0001EX00002871/16) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Warning', message: '[thrd:main]: Application maximum poll interval (300000ms) exceeded by 6ms (adjust max.poll.interval.ms for long-running message processing): leaving group'.
2021-12-09T18:03:31.1213413+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) Exception in ProduceAsync on Publish action: Local: Message timed out
2021-12-09T18:03:31.2238898+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) Consume Processing Exception: Confluent.Kafka, Local: Message timed out.Exception: Confluent.Kafka.ProduceException`2[System.String,EventHubConsumer.Common.Models.EventMessage]: Local: Message timed out
   at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)
   at KafkaCommon.Implementations.KafkaProducer`1.PublishAsync(Message`2 message, String topic)
   at MessageDelayer.Implementations.MessageHandlingService.MessageHandlerAsync(Message`2 message) in C:\Projects\EventHubConsumer\MessageDelayer\Implementations\MessageHandlingService.cs:line 63
   at KafkaCommon.Implementations.KafkaConsumer.RunLoopConsume[TValue](String topic, CancellationToken cancellationToken, Func`2 consumedMessageHandle, Int32 commitPeriod)
2021-12-09T18:03:31.2695064+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: GroupCoordinator: 10.10.10.111:9092: Disconnected (after 257701ms in state UP). No action required.
2021-12-09T18:03:31.2724311+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Disconnected (after 251640ms in state UP). No action required.
2021-12-09T18:03:31.2744537+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2200ms in state CONNECT). No action required.
2021-12-09T18:03:31.2762501+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2188ms in state CONNECT, 1 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2777508+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2195ms in state CONNECT, 4 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2792417+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2191ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2807164+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2187ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2822026+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2187ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2837419+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2853034+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2189ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2911678+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2190ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2934483+03:00 [INF] (MessageDelayer/V0001EX00002871/30) : Revoking assignment: [event_delay.10m[0], event_delay.10s[0], event_delay.1d[0], event_delay.1h[0], event_delay.1m[0], event_delay.1s[0]]
2021-12-09T18:03:49.4364271+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2190ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:04:26.0811760+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2191ms in state CONNECT, 3 identical error(s) suppressed)'.
**// There were 32 identical lines with different "after ms"**
2021-12-09T18:24:25.1902300+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2190ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:25:01.7801891+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2192ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:28:48.8630677+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2191ms in state CONNECT, 1 identical error(s) suppressed)'.
2021-12-09T18:29:28.1940841+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2192ms in state CONNECT, 5 identical error(s) suppressed)'.
2021-12-09T18:30:03.9443300+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2185ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:30:40.3925052+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2217ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:47:51.2166734+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2227ms in state CONNECT, 1 identical error(s) suppressed)'.
2021-12-09T18:48:32.1851521+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2240ms in state CONNECT, 5 identical error(s) suppressed)'.
2021-12-09T18:49:05.1773458+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2229ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:49:37.4058398+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2263ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:50:11.7793305+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2227ms in state CONNECT, 3 identical error(s) suppressed)'.
%3|1639065029.967|FAIL|rdkafka#producer-2| [thrd:mq3.dev.mp.mtsit.com:9092/bootstrap]: mq3.dev.mp.mtsit.com:9092/bootstrap: Connect to ipv4#10.10.10.112:9092 failed: Unknown error (after 2282ms in state CONNECT)
%3|1639065029.967|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: mq3.dev.mp.mtsit.com:9092/bootstrap: Connect to ipv4#10.10.10.112:9092 failed: Unknown error (after 2282ms in state CONNECT)
2021-12-13T18:24:59.1784023+03:00 [INF] (MessageDelayer/V0001EX00002871/76) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Info', message: '[thrd:10.10.10.110:9092/0]: 10.10.10.110:9092/0: Disconnected (after 344062906ms in state UP)'.
2021-12-13T18:25:01.5707142+03:00 [INF] (MessageDelayer/V0001EX00002871/72) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:mq1.dev.mp.mtsit.com:9092/bootstrap]: mq1.dev.mp.mtsit.com:9092/bootstrap: Connect to ipv4#10.10.10.110:9092 failed: Unknown error (after 2267ms in state CONNECT)'.
%3|1639409108.324|ERROR|rdkafka#producer-2| [thrd:mq2.dev.mp.mtsit.com:9092/bootstrap]: 6/6 brokers are down

In Rider dotMemory, you can see that the occupied space is in unmanaged memory: image

In VMMap we can see the growth of the total heap image

During normal stable operation, the application takes up about 100 MB of memory, the memory leak takes up several gigabytes

Thanks

DobrovAlexey avatar Dec 16 '21 13:12 DobrovAlexey

I recently encountered a similar problem and found the cause with the WinDBG tool. Remove the StatisticsIntervalMs configuration. Because I found a lot of Kafka Statistics in the unmanaged heap through Windbg. The diagram below: image image You can have a try。

GhostCakeMaker avatar May 07 '22 03:05 GhostCakeMaker

thanks @GhostCakeMaker - i haven't looked into this yet, but seems like there may be a memory issue here on the .net side. we'll take a look.

mhowlett avatar Oct 07 '22 17:10 mhowlett

@DobrovAlexey @GhostCakeMaker I tried with a simple consumer application having StatisticsIntervalMs config present but didn't observe any memory leak even when I left it running for about a day or so.

Can you provide any consumer code with which you observed the memory leak? This will help us in debugging the issue.

anchitj avatar Nov 11 '22 09:11 anchitj

You can create enough data for consumption. The initial memory of the consumer program is about 100MB. Do not close the consumer program. You will find that its memory will gradually increase.The growth rate can be relatively slow Client Configuration:

    "Consumer": {
      "BootstrapServers": "localhost:9092",
      "GroupId": "123",
      "EnableAutoCommit": false,
      "StatisticsIntervalMs": 5000,
      "SessionTimeoutMs": 6000,
      "AutoOffsetReset": "Earliest",
      "EnablePartitionEof": true
    }

Consumer Code(Please note that some codes are pseudo codes):

        private void ConsumerService(object token)
        {
            var t = (CancellationToken)token;
            try
            {
                while (!t.IsCancellationRequested)
                {
                    ConsumeResult<string, string> consumeResult = null;
                    try
                    {
                        consumeResult = kafka.Consume(t);
                        if (consumeResult.IsPartitionEOF)
                        {
                            continue;
                        }
                        if (consumeResult != null)
                        {
                            var value = JsonConvert.DeserializeObject<Message>(consumeResult.Message.Value?.ToString());
                            try
                            {
                                kafka.Commit(consumeResult);
                                try
                                {
                                    Task.Run().....
                                }
                                catch (Exception ex)
                                {
                                    Log.Error(ex);
                                }
                            }
                            catch (KafkaException e)
                            {
                                    Log.Error(ex);
                            }
                            consumeResult = null;
                            value = null;
                        }
                        else
                        {
                            Log.Info($"ConsumeResult is null");
                        }
                    }
                    catch (ConsumeException e)
                    {
                         Log.Error(ex);
                    }
                }
            }
            catch (OperationCanceledException e)
            {
                Log.Error(ex);
                kafka.Close();
            }
        }

GhostCakeMaker avatar Nov 11 '22 10:11 GhostCakeMaker