rabbitmq-dotnet-client
rabbitmq-dotnet-client copied to clipboard
Heap size grows when publishing a very large batch of messages
I noticed a memory leak on production (application in the docker container) when I was publishing a large number of messages at one time (500 k).
Then I reproduced this behavior locally on the Mac:
- launched rabbitmq in docker
version: "3"
services:
rabbitmq:
image: "rabbitmq:3-management"
ports:
- "5672:5672"
- "15672:15672"
volumes:
- 'rabbitmq_data:/data'
volumes:
rabbitmq_data:
- write example code in which memory leaks
using System;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Serilog;
namespace RabbitMemoryLeakMacOs
{
class Program
{
async static Task Main(string[] args)
{
var log = GetLogger();
var connectionFactory = new ConnectionFactory()
{
HostName = "localhost"
};
var cn = connectionFactory.CreateConnection();
var exchangeName = GetExchange(cn);
var sendingModel = cn.CreateModel();
var messageCounter = 0;
var sendingLimit = 400_000;
while (messageCounter < sendingLimit)
{
try
{
var basicProperties = sendingModel.CreateBasicProperties();
basicProperties.Persistent = true;
var randomTextMessage = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(largeMessage));
sendingModel.BasicPublish(exchangeName, "test", false, basicProperties, randomTextMessage);
if(messageCounter++%100_000 == 0)
log.Information("Sended {counter}, used memory {TotalMemory}", messageCounter, GC.GetTotalMemory(true)/(1024*1024));
}
catch (Exception e)
{
Console.WriteLine($"error on send {messageCounter}" + e.Message);
}
}
GC.Collect();
GC.WaitForPendingFinalizers();
log.Information("Used memory {TotalMemory} MB", GC.GetTotalMemory(true)/(1024*1024));
log.Information("OS {Os}", System.Runtime.InteropServices.RuntimeInformation.OSDescription);
log.Information("Process id {Id}", System.Diagnostics.Process.GetCurrentProcess().Id);
Console.ReadLine();
}
private const string largeMessage =
@"test message large text test message large texttest message
large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message
large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message
large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message
large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message
large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message
large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message
large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message
large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message large texttest message
large texttest message large texttest message large texttest message large texttest message large text";
private static ILogger GetLogger()
{
return new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
}
private static string GetExchange(IConnection connection)
{
var model = connection.CreateModel();
var exchangeName = "testExchange";
model.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false);
var queueName = model.QueueDeclare(exchangeName, true, false, false).QueueName;
model.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: "");
return exchangeName;
}
}
}
Program output:
[14:36:56 INF] Sended 1, used memory 0
[14:36:57 INF] Sended 100001, used memory 363
[14:36:58 INF] Sended 200001, used memory 724
[14:36:58 INF] Sended 300001, used memory 1090
[14:36:59 INF] Used memory 1472 MB
[14:36:59 INF] OS Darwin 20.6.0 Darwin Kernel Version 20.6.0: Wed Jun 23 00:26:31 PDT 2021; root:xnu-7195.141.2~5/RELEASE_X86_64
[14:36:59 INF] Process id 41765
- analyzed what objects and what code leads to this
3.1) create memory dump with dotnet-dump tools
3.2) get stat
dumpheap -stat
00000001113a7a60 734 77088 System.String
0000000111f86050 1699 299024 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[System.IO.BufferedStream+<WriteToUnderlyingStreamAsync>d__62, System.Private.CoreLib]]
0000000111c14368 3 3670088 System.Memory`1[[System.Byte, System.Private.CoreLib]][]
00007f85a9010940 135604 24285808 Free
00000001114308a0 203743 839723804 System.Byte[]
Total 343840 objects
3.3) get references of instance dumpheap -mt 00000001114308a0
00000001c586ea20 00000001114308a0 4120
00000001c586fa50 00000001114308a0 4120
00000001c5870a80 00000001114308a0 4120
00000001c5871ab0 00000001114308a0 4120
00000001c5872ae0 00000001114308a0 4120
00000001c5873b10 00000001114308a0 4120
00000001c5874b40 00000001114308a0 4120
00000001c5875b70 00000001114308a0 4120
00000001c5c372e8 00000001114308a0 3024
00000001c5c3c528 00000001114308a0 1049
00000001c5c59f98 00000001114308a0 40
00000001c5c5a080 00000001114308a0 56
00000001c5c5e5c8 00000001114308a0 40
00000001924a9038 00000001114308a0 342576
3.4) get stack trace created instance gcroot -all 00000001c5871ab0
HandleTable:
0000000102BB1310 (strong handle)
-> 00000001824BFBC8 System.Object[]
-> 00000001824C5950 System.Threading.Tasks.Task
-> 00000001824C5910 System.Action
-> 00000001824C4C78 RabbitMQ.Client.Framing.Impl.Connection
-> 00000001824B9AE8 RabbitMQ.Client.Impl.SocketFrameHandler
-> 00000001824B9D58 System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
-> 00000001824B9B88 System.Threading.Channels.SingleConsumerUnboundedChannel`1[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
-> 00000001824B9BD8 System.Collections.Concurrent.SingleProducerSingleConsumerQueue`1[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
-> 00000001B2195608 System.Collections.Concurrent.SingleProducerSingleConsumerQueue`1+Segment[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
-> 00000001926DCB68 System.Memory`1[[System.Byte, System.Private.CoreLib]][]
-> 00000001C5871AB0 System.Byte[]
0000000102BB1368 (strong handle)
-> 000000019A4AD420 System.Object[]
-> 00000001824BB448 System.Net.Sockets.SocketAsyncEngine[]
-> 00000001824BB468 System.Net.Sockets.SocketAsyncEngine
-> 00000001824BB4D0 System.Collections.Concurrent.ConcurrentDictionary`2[[System.IntPtr, System.Private.CoreLib],[System.Net.Sockets.SocketAsyncEngine+SocketAsyncContextWrapper, System.Net.Sockets]]
-> 00000001824BB990 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[System.IntPtr, System.Private.CoreLib],[System.Net.Sockets.SocketAsyncEngine+SocketAsyncContextWrapper, System.Net.Sockets]]
-> 00000001824BB880 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[System.IntPtr, System.Private.CoreLib],[System.Net.Sockets.SocketAsyncEngine+SocketAsyncContextWrapper, System.Net.Sockets]][]
-> 00000001824C5B38 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[System.IntPtr, System.Private.CoreLib],[System.Net.Sockets.SocketAsyncEngine+SocketAsyncContextWrapper, System.Net.Sockets]]
-> 00000001824C09A0 System.Net.Sockets.SocketAsyncContext
-> 00000001824E5F68 System.Net.Sockets.SocketAsyncContext+BufferMemorySendOperation
-> 00000001824E24E0 System.Action`5[[System.Int32, System.Private.CoreLib],[System.Byte[], System.Private.CoreLib],[System.Int32, System.Private.CoreLib],[System.Net.Sockets.SocketFlags, System.Net.Sockets],[System.Net.Sockets.SocketError, System.Net.Primitives]]
-> 00000001824E23C8 System.Net.Sockets.Socket+AwaitableSocketAsyncEventArgs
-> 00000001C5C87178 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[System.IO.BufferedStream+<WriteToUnderlyingStreamAsync>d__62, System.Private.CoreLib]]
-> 00000001824C5DE8 System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1+AsyncStateMachineBox`1[[System.Threading.Tasks.VoidTaskResult, System.Private.CoreLib],[RabbitMQ.Client.Impl.SocketFrameHandler+<WriteLoop>d__32, RabbitMQ.Client]]
-> 00000001824B9AE8 RabbitMQ.Client.Impl.SocketFrameHandler
-> 00000001824B9D58 System.Threading.Channels.SingleConsumerUnboundedChannel`1+UnboundedChannelWriter[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
-> 00000001824B9B88 System.Threading.Channels.SingleConsumerUnboundedChannel`1[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
-> 00000001824B9BD8 System.Collections.Concurrent.SingleProducerSingleConsumerQueue`1[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
-> 00000001B2195608 System.Collections.Concurrent.SingleProducerSingleConsumerQueue`1+Segment[[System.Memory`1[[System.Byte, System.Private.CoreLib]], System.Private.CoreLib]]
-> 00000001926DCB68 System.Memory`1[[System.Byte, System.Private.CoreLib]][]
-> 00000001C5871AB0 System.Byte[]
Check this code in dotnet sdk 5, and sdk 6.
All of those seem to be tasks and System.Collections.Concurrent
queues entries.
This is most likely related to the use of unbounded channel buffering we use for publishing messages (see here and here)
So most likely most of the used memory is freed again once the messages are actually published (most = the byte arrays).
This usage pattern I'd argue is unlikely something you encounter in a real world application, and even if, since most of the memory is going to be reclaimed again, it's less of an issue.
possible solution:
- Switching to a bounded channel (either configurable override behavior or switch to async to wait until there's free space)
- disable the intermediate buffering (possibly configurable, but impact is going to be substantial)
I have the same problem,When I send data at high frequency。
Using a bounded channel will force us face a classic problem of "what to do when the buffer is full". Neither dropping data on the floor nor blocking publishing seem very appealing to most. In addition, the user can build something similar on top of what we have. Or we have to make this configurable so that each picks their own poison.
My guess is that the changes already made in main
, coupled with #1445, will address this. Closing.