confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Memory leak when reconnecting to a broker
Hi, Faced an unmanaged memory leak in Confluent.Kafka
Confluent.Kafka nuget version 1.8.1 Apache Kafka version 2.13-2.8.0
Client configuration:
"ConsumerConfig": {
"BootstrapServers": "",
"GroupId": "MessageDelayer-Group",
"EnableAutoCommit": false,
"AutoOffsetReset": 1,
"EnablePartitionEof": true,
"PartitionAssignmentStrategy": 2,
"StatisticsIntervalMs": 5000,
"SessionTimeoutMs": 6000
}
Reproduce the problem:
- The handler application reads messages and processes them
- We stop the broker (simulate a fall)
- The app is trying to reconnect, but after half an hour it stops doing it, and memory begins to grow (The consumed memory in 12 hours grows by about 1 gigabyte)
- Turn on the broker, but our application no longer tries to connect and hangs
There are potentially two problems here:
- Main and most important one is a memory leak
- Connection restoration problem (possibly solved by correct client configuration settings)
Application log:
2021-12-09T17:58:30.5454191+03:00 [INF] (MessageDelayer/V0001EX00002871/7) Delivered message to topic hub.22ad8994-b176-474f-9cc7-e7ac7555f600, partition [0], offset 346659.
2021-12-09T17:58:30.5489565+03:00 [INF] (MessageDelayer/V0001EX00002871/7) rdkafka-ed85977e-a0b6-491c-a7ce-121124ca82e3: [event_delay.1h [0] @482624] Successfully committed
2021-12-09T17:58:30.5866259+03:00 [INF] (MessageDelayer/V0001EX00002871/7) rdkafka-ed85977e-a0b6-491c-a7ce-121124ca82e3: [event_delay.1h [0] @482625] Received message
2021-12-09T17:58:30.7003588+03:00 [INF] (MessageDelayer/V0001EX00002871/7) Delivered message to topic hub.22ad8994-b176-474f-9cc7-e7ac7555f600, partition [0], offset 346660.
2021-12-09T17:58:30.7033923+03:00 [INF] (MessageDelayer/V0001EX00002871/7) rdkafka-ed85977e-a0b6-491c-a7ce-121124ca82e3: [event_delay.1h [0] @482625] Successfully committed
2021-12-09T17:58:30.7382826+03:00 [INF] (MessageDelayer/V0001EX00002871/7) rdkafka-ed85977e-a0b6-491c-a7ce-121124ca82e3: [event_delay.1h [0] @482626] Received message
%6|1639061910.905|FAIL|rdkafka#producer-2| [thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Disconnected (after 5683066ms in state UP)
%3|1639061910.905|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: 10.10.10.111:9092/1: Disconnected (after 5683066ms in state UP)
2021-12-09T17:58:30.9140565+03:00 [INF] (MessageDelayer/V0001EX00002871/24) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Info', message: '[thrd:GroupCoordinator]: GroupCoordinator: 10.10.10.111:9092: Disconnected (after 257701ms in state UP)'.
2021-12-09T17:58:30.9141665+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Info', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Disconnected (after 251640ms in state UP)'.
%3|1639061913.102|FAIL|rdkafka#producer-2| [thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT)
%3|1639061913.102|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT)
2021-12-09T17:58:33.1181180+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2200ms in state CONNECT)'.
2021-12-09T17:58:35.6163051+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2188ms in state CONNECT, 1 identical error(s) suppressed)'.
2021-12-09T17:59:09.8213358+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2195ms in state CONNECT, 4 identical error(s) suppressed)'.
2021-12-09T17:59:46.2803743+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2191ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:00:22.7642990+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2187ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:00:57.0782124+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2187ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:01:33.0375968+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:02:09.1867518+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2189ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:02:42.2621110+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2190ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:03:11.9185224+03:00 [INF] (MessageDelayer/V0001EX00002871/16) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Warning', message: '[thrd:main]: Consumer group session timed out (in join-state steady) after 12614 ms without a successful response from the group coordinator (broker 2, last error was Success): revoking assignment and rejoining group'.
2021-12-09T18:03:14.1057305+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT, 2 identical error(s) suppressed)'.
2021-12-09T18:03:30.7467865+03:00 [INF] (MessageDelayer/V0001EX00002871/16) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Warning', message: '[thrd:main]: Application maximum poll interval (300000ms) exceeded by 6ms (adjust max.poll.interval.ms for long-running message processing): leaving group'.
2021-12-09T18:03:31.1213413+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) Exception in ProduceAsync on Publish action: Local: Message timed out
2021-12-09T18:03:31.2238898+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) Consume Processing Exception: Confluent.Kafka, Local: Message timed out.Exception: Confluent.Kafka.ProduceException`2[System.String,EventHubConsumer.Common.Models.EventMessage]: Local: Message timed out
at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)
at KafkaCommon.Implementations.KafkaProducer`1.PublishAsync(Message`2 message, String topic)
at MessageDelayer.Implementations.MessageHandlingService.MessageHandlerAsync(Message`2 message) in C:\Projects\EventHubConsumer\MessageDelayer\Implementations\MessageHandlingService.cs:line 63
at KafkaCommon.Implementations.KafkaConsumer.RunLoopConsume[TValue](String topic, CancellationToken cancellationToken, Func`2 consumedMessageHandle, Int32 commitPeriod)
2021-12-09T18:03:31.2695064+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: GroupCoordinator: 10.10.10.111:9092: Disconnected (after 257701ms in state UP). No action required.
2021-12-09T18:03:31.2724311+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Disconnected (after 251640ms in state UP). No action required.
2021-12-09T18:03:31.2744537+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2200ms in state CONNECT). No action required.
2021-12-09T18:03:31.2762501+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2188ms in state CONNECT, 1 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2777508+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2195ms in state CONNECT, 4 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2792417+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2191ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2807164+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2187ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2822026+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2187ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2837419+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2194ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2853034+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2189ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2911678+03:00 [ERR] (MessageDelayer/V0001EX00002871/30) : Consumer error: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2190ms in state CONNECT, 3 identical error(s) suppressed). No action required.
2021-12-09T18:03:31.2934483+03:00 [INF] (MessageDelayer/V0001EX00002871/30) : Revoking assignment: [event_delay.10m[0], event_delay.10s[0], event_delay.1d[0], event_delay.1h[0], event_delay.1m[0], event_delay.1s[0]]
2021-12-09T18:03:49.4364271+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2190ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:04:26.0811760+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2191ms in state CONNECT, 3 identical error(s) suppressed)'.
**// There were 32 identical lines with different "after ms"**
2021-12-09T18:24:25.1902300+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2190ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:25:01.7801891+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2192ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:28:48.8630677+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2191ms in state CONNECT, 1 identical error(s) suppressed)'.
2021-12-09T18:29:28.1940841+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2192ms in state CONNECT, 5 identical error(s) suppressed)'.
2021-12-09T18:30:03.9443300+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2185ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:30:40.3925052+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2217ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:47:51.2166734+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2227ms in state CONNECT, 1 identical error(s) suppressed)'.
2021-12-09T18:48:32.1851521+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2240ms in state CONNECT, 5 identical error(s) suppressed)'.
2021-12-09T18:49:05.1773458+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2229ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:49:37.4058398+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2263ms in state CONNECT, 3 identical error(s) suppressed)'.
2021-12-09T18:50:11.7793305+03:00 [INF] (MessageDelayer/V0001EX00002871/14) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:10.10.10.111:9092/1]: 10.10.10.111:9092/1: Connect to ipv4#10.10.10.111:9092 failed: Unknown error (after 2227ms in state CONNECT, 3 identical error(s) suppressed)'.
%3|1639065029.967|FAIL|rdkafka#producer-2| [thrd:mq3.dev.mp.mtsit.com:9092/bootstrap]: mq3.dev.mp.mtsit.com:9092/bootstrap: Connect to ipv4#10.10.10.112:9092 failed: Unknown error (after 2282ms in state CONNECT)
%3|1639065029.967|ERROR|rdkafka#producer-2| [thrd:app]: rdkafka#producer-2: mq3.dev.mp.mtsit.com:9092/bootstrap: Connect to ipv4#10.10.10.112:9092 failed: Unknown error (after 2282ms in state CONNECT)
2021-12-13T18:24:59.1784023+03:00 [INF] (MessageDelayer/V0001EX00002871/76) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Info', message: '[thrd:10.10.10.110:9092/0]: 10.10.10.110:9092/0: Disconnected (after 344062906ms in state UP)'.
2021-12-13T18:25:01.5707142+03:00 [INF] (MessageDelayer/V0001EX00002871/72) Consuming from Kafka. Client: 'rdkafka#consumer-12', syslog level: 'Error', message: '[thrd:mq1.dev.mp.mtsit.com:9092/bootstrap]: mq1.dev.mp.mtsit.com:9092/bootstrap: Connect to ipv4#10.10.10.110:9092 failed: Unknown error (after 2267ms in state CONNECT)'.
%3|1639409108.324|ERROR|rdkafka#producer-2| [thrd:mq2.dev.mp.mtsit.com:9092/bootstrap]: 6/6 brokers are down
In Rider dotMemory, you can see that the occupied space is in unmanaged memory:

In VMMap we can see the growth of the total heap

During normal stable operation, the application takes up about 100 MB of memory, the memory leak takes up several gigabytes
Thanks
I recently encountered a similar problem and found the cause with the WinDBG tool. Remove the StatisticsIntervalMs configuration. Because I found a lot of Kafka Statistics in the unmanaged heap through Windbg. The diagram below:
You can have a try。
thanks @GhostCakeMaker - i haven't looked into this yet, but seems like there may be a memory issue here on the .net side. we'll take a look.
@DobrovAlexey @GhostCakeMaker I tried with a simple consumer application having StatisticsIntervalMs config present but didn't observe any memory leak even when I left it running for about a day or so.
Can you provide any consumer code with which you observed the memory leak? This will help us in debugging the issue.
You can create enough data for consumption. The initial memory of the consumer program is about 100MB. Do not close the consumer program. You will find that its memory will gradually increase.The growth rate can be relatively slow Client Configuration:
"Consumer": {
"BootstrapServers": "localhost:9092",
"GroupId": "123",
"EnableAutoCommit": false,
"StatisticsIntervalMs": 5000,
"SessionTimeoutMs": 6000,
"AutoOffsetReset": "Earliest",
"EnablePartitionEof": true
}
Consumer Code(Please note that some codes are pseudo codes):
private void ConsumerService(object token)
{
var t = (CancellationToken)token;
try
{
while (!t.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = null;
try
{
consumeResult = kafka.Consume(t);
if (consumeResult.IsPartitionEOF)
{
continue;
}
if (consumeResult != null)
{
var value = JsonConvert.DeserializeObject<Message>(consumeResult.Message.Value?.ToString());
try
{
kafka.Commit(consumeResult);
try
{
Task.Run().....
}
catch (Exception ex)
{
Log.Error(ex);
}
}
catch (KafkaException e)
{
Log.Error(ex);
}
consumeResult = null;
value = null;
}
else
{
Log.Info($"ConsumeResult is null");
}
}
catch (ConsumeException e)
{
Log.Error(ex);
}
}
}
catch (OperationCanceledException e)
{
Log.Error(ex);
kafka.Close();
}
}