nats.net icon indicating copy to clipboard operation
nats.net copied to clipboard

Nats client has a memory leak

Open sspates opened this issue 2 years ago • 2 comments

Observed behavior

Based on a performance test result from .dot Memory tool we are observing that NATS .Net client library has high memory usage when the client does not unsubscribe.

Memory usage resets if the client reconnects.

Service is being run for 1 hour with constant load during which time memory usage progressively increases.

Here is a simplified version of our connection and subscribe:

SimplifiedClient client = new("localhost");
client.Connect();
client.Subscribe("foo.bar", "test", (msg) => Console.WriteLine(msg.Subject));
client.SubscribeAsync("foo.baz", "test", (msg) => Console.WriteLine(msg.Subject));

while (true)
{
    await Task.Delay(200);
}

public class SimplifiedClient : IDisposable
{
    private IConnection? _connection;
    private IJetStreamManagement? _jetStreamManagement;
    private IJetStream? _jetStream;
    private readonly string[] _hosts;

    public SimplifiedClient(params string[] hosts)
    {
        _hosts = hosts;
    }

    public void Connect()
    {
        Options? options = ConnectionFactory.GetDefaultOptions();
        options.Servers = _hosts;
        options.MaxReconnect = Options.ReconnectForever;

        ConnectionFactory connectionFactory = new();

        _connection = connectionFactory.CreateConnection(options);

        _jetStreamManagement = _connection.CreateJetStreamManagementContext();

        _jetStream = _connection.CreateJetStreamContext();
    }

    public void Subscribe(string subject, string queueName, Action<Msg> action)
    {
        EventHandler<MsgHandlerEventArgs> eventHandler = (_, args) => HandleMessage(args, action);

        _connection!.SubscribeAsync(subject, queueName, eventHandler);
    }

    public void SubscribeAsync(string subject, string queueName, Action<Msg> action)
    {
        EventHandler<MsgHandlerEventArgs> eventHandler = (_, args) => HandleMessage(args, action);

        PushSubscribeOptions options = BuildPushSubscribeOptions(queueName);

        _jetStream!.PushSubscribeAsync(subject, eventHandler, false, options);
    }

    private void HandleMessage(MsgHandlerEventArgs args, Action<Msg> handler)
    {
        try
        {
            handler(args.Message);

            args.Message.Ack();
        }
        catch
        {
            args.Message.Nak();
        }
    }

    private PushSubscribeOptions BuildPushSubscribeOptions(string queueName)
    {
        ConsumerConfiguration consumerConfiguration = ConsumerConfiguration.Builder().Build();

        PushSubscribeOptions.PushSubscribeOptionsBuilder? builder = PushSubscribeOptions.Builder();

        builder
            .WithStream("TestStream")
            .WithConfiguration(consumerConfiguration)
            .WithDurable(queueName)
            .WithDeliverGroup(queueName);

        return builder.Build();
    }

    public void Dispose()
    {
        _connection?.Drain();
        _connection?.Close();
        _connection?.Dispose();
    }
}

Expected behavior

Service memory is at a constant level over the hour

Server and client version

Server: 2.10.9 Library: 1.1.1

Host environment

Windows 10 Enterprise LTSC X86 CPU (various models) Minimum 8GB memory NATS hosted as windows service

Steps to reproduce

See code in observed behavior.

Run for around 1 hour and monitor memory utilization.

sspates avatar Feb 01 '24 18:02 sspates

As far as I know, if you do not ack a message it will timeout and then put back to queue to be re-delivered.

alberk8 avatar May 23 '25 00:05 alberk8

thanks for the report @Jgfrausing. couple of questions: which version of the library are you using? also can you reproduce the issue using the nats cli or the Go library for example? or does it only happen with the .net client.

mtmk avatar May 23 '25 07:05 mtmk

Thank you, @mtmk

NATS.Net Version 2.5.12 It cannot be reproduced using CLI as it auto ACKs.

I did, however, just try it in Rust, and it seems to be the same issue.

I really don't understand why, though. Do you agree, that it is unexpected/faulty behaviour or is there something I'm missing?

Jgfrausing avatar May 23 '25 07:05 Jgfrausing

tbh i'm not sure but it does sound like a general nats question if rust client behaved the same way. cli seems to have a couple of options for acks. if you can reproduce it with the cli or failing that using the Go client it'd be easier to assess for more people.

mtmk avatar May 23 '25 07:05 mtmk

Thank you for your feedback, @mtmk . I'll close this one and open it in the nats server repo instead. I'm almost certain that it cannot be reproduced in the CLI. I have the workaround and the issue, in my use case, is only to overcome the initial startup.

Jgfrausing avatar May 23 '25 07:05 Jgfrausing