nats.net icon indicating copy to clipboard operation
nats.net copied to clipboard

When client disconnects, it doesn't reconnect Jetstream Handlers

Open sspates opened this issue 2 years ago • 9 comments

Defect

Make sure that these boxes are checked before submitting your issue -- thank you!

  • [x] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve)

Versions of NATS.Client and nats-server:

OS/Container environment:

Windows (Client) Docker (Server) Visual Studio 2022 .Net 6.0

Steps or code to reproduce the issue:

 var opts = ConnectionFactory.GetDefaultOptions();

        var factory = new ConnectionFactory();
        var connection = factory.CreateConnection(opts);
        var jetStream = connection.CreateJetStreamContext();
        var jetStreamManagement = connection.CreateJetStreamManagementContext();

        string streamName = "teststream";

        try
        {
            jetStreamManagement.GetStreamInfo(streamName); // this throws if the stream does not exist
        }
        catch (Exception)
        {
            StreamConfiguration sc = StreamConfiguration.Builder()
                .WithName(streamName)
                .WithStorageType(StorageType.Memory)
                .Build();

            jetStreamManagement.AddStream(sc);
        }

        string subject = "test.subject.jetStream";
        StreamInfo stream = jetStreamManagement.GetStreamInfo(streamName);
        var config = stream.Config;
        if (!config.Subjects.Contains(subject))
        {
            config.Subjects.Add(subject);
        }

        jetStreamManagement.UpdateStream(config);

        static void Handler(MsgHandlerEventArgs args)
        {
            var data = args.Message?.Data;
            if (data?.Length > 0)
            {
                string sData = Encoding.UTF8.GetString(data);
                Console.WriteLine(sData);
            }

            args.Message?.Ack();
        }

        void EventHandler(object sender, MsgHandlerEventArgs args) => Handler(args);

        var options = new PushSubscribeOptions.PushSubscribeOptionsBuilder()
            .WithDurable(streamName)
            .WithStream(streamName)
            .Build();

        jetStream.PushSubscribeAsync(subject, EventHandler!, false, options);

        var messageBytes = Encoding.UTF8.GetBytes("Testing");

        connection.Publish(subject, messageBytes);

        Thread.Sleep(1000); //Wait for message

        Console.WriteLine("Stop the server or break connection then press enter...");
        Console.ReadLine();

        try
        {
            connection.Publish(subject, messageBytes);
        }
        catch
        {
            /* ignored */
        } //Ensure disconnection event occurs.

        Console.WriteLine("Start the server or reconnect connection then press enter...");
        Console.ReadLine();

        connection.Publish(subject, messageBytes);

        Thread.Sleep(5000); //Wait for message

Expected result:

After reconnection, the message is received by the Jetstream subscriber.

Actual result:

Jetstream subscriber is not reconnected when NATS reconnects.

sspates avatar Feb 04 '22 16:02 sspates

Is this a single machine? A memory stream is gone if the server goes down. Also the ephemeral might be lost. The client will read what's in the incoming memory buffer, but won't be able to ack it.

scottf avatar Jun 18 '22 00:06 scottf

@sspates-starbucks Can you respond to @scottf question? An ephemeral consumer would need to be recreated if the server it lives on (memory only) goes down or if the client is disconnected for a long enough time that the server harvests the ephemeral consumer. Likewise, an R1 memory-store stream goes away if the server it lives on is brought down.

tbeets avatar Aug 03 '22 00:08 tbeets

@scottf The issue repros both on a single device and with two different devices (client/server). The issue seems to be that when the client disconnects for any reason and then reconnects, the handlers don't get reconnected to the new Jetstream instance that get initiated for the reconnect. I was able to create work around code by just re-establishing the handlers once the client reconnects, but without the work around, any new messages sent don't get received.

sspates avatar Aug 03 '22 17:08 sspates

@sspates-starbucks Can you clarify what you mean by "new Jetstream instance"? The intent is that the extant JetStream context and bindings should see the refreshed NATS connection and attempt to continue to work.

We see in your test code that the test stream is memory-resident. Would be better as file to dis-ambiguate that you are not killing the server (where the stream lives only in memory) to initiate the disconnect/reconnect cycle.

tbeets avatar Aug 03 '22 20:08 tbeets

In addition, your example also shows an ephemeral JS Consumer. A JS Consumer will expire and be expunged in approx. 5 seconds if the server sees no client subscription interest.

A durable consumer would be a better test example to eliminate ambiguity of potentially trying to reference a non-existing JS Consumer (i.e. one that has been deleted by the server).

tbeets avatar Aug 03 '22 20:08 tbeets

@scottf the same code with durable queues and/or a file based stream still exhibit the same issues.

sspates avatar Aug 03 '22 22:08 sspates

Any updates guys?

TodorBalabanski avatar Oct 26 '22 07:10 TodorBalabanski

@TodorBalabanski Is there a specific situation that you can describe having this problem? What is the topology? What are the stream and consumer configurations? Some situations it is expected not to reconnect and some are. Do you use the event listeners DisconnectedEventHandler / ReconnectedEventHandler / AsyncErrorEventHandler ?

scottf avatar Oct 26 '22 11:10 scottf

@scottf @tbeets I updated to make the connection durable and stream connected. Neither one changes the result.

sspates avatar Feb 22 '23 23:02 sspates

Any resolution here?

LaiArturs avatar May 02 '24 11:05 LaiArturs

The latest release should take care of this, but it depends on exactly your type of consumer. Only durable consumers or ephemeral consumers with a inactive threshold that is longer than the time it takes to reconnect. Ephemerals also are unlikely to recover if they were memory based and not replicated (the default) or if the stream or consumer leader server was the one that came down.

This applies to push and pull consumers. Ordered consumers, even though they are ephemeral can recover because their state is kept on the client and can be reconstructed.

This work was done in this PR: https://github.com/nats-io/nats.net/pull/865

scottf avatar May 02 '24 21:05 scottf