nats.net
nats.net copied to clipboard
When client disconnects, it doesn't reconnect Jetstream Handlers
Defect
Make sure that these boxes are checked before submitting your issue -- thank you!
- [x] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve)
Versions of NATS.Client
and nats-server
:
OS/Container environment:
Windows (Client) Docker (Server) Visual Studio 2022 .Net 6.0
Steps or code to reproduce the issue:
var opts = ConnectionFactory.GetDefaultOptions();
var factory = new ConnectionFactory();
var connection = factory.CreateConnection(opts);
var jetStream = connection.CreateJetStreamContext();
var jetStreamManagement = connection.CreateJetStreamManagementContext();
string streamName = "teststream";
try
{
jetStreamManagement.GetStreamInfo(streamName); // this throws if the stream does not exist
}
catch (Exception)
{
StreamConfiguration sc = StreamConfiguration.Builder()
.WithName(streamName)
.WithStorageType(StorageType.Memory)
.Build();
jetStreamManagement.AddStream(sc);
}
string subject = "test.subject.jetStream";
StreamInfo stream = jetStreamManagement.GetStreamInfo(streamName);
var config = stream.Config;
if (!config.Subjects.Contains(subject))
{
config.Subjects.Add(subject);
}
jetStreamManagement.UpdateStream(config);
static void Handler(MsgHandlerEventArgs args)
{
var data = args.Message?.Data;
if (data?.Length > 0)
{
string sData = Encoding.UTF8.GetString(data);
Console.WriteLine(sData);
}
args.Message?.Ack();
}
void EventHandler(object sender, MsgHandlerEventArgs args) => Handler(args);
var options = new PushSubscribeOptions.PushSubscribeOptionsBuilder()
.WithDurable(streamName)
.WithStream(streamName)
.Build();
jetStream.PushSubscribeAsync(subject, EventHandler!, false, options);
var messageBytes = Encoding.UTF8.GetBytes("Testing");
connection.Publish(subject, messageBytes);
Thread.Sleep(1000); //Wait for message
Console.WriteLine("Stop the server or break connection then press enter...");
Console.ReadLine();
try
{
connection.Publish(subject, messageBytes);
}
catch
{
/* ignored */
} //Ensure disconnection event occurs.
Console.WriteLine("Start the server or reconnect connection then press enter...");
Console.ReadLine();
connection.Publish(subject, messageBytes);
Thread.Sleep(5000); //Wait for message
Expected result:
After reconnection, the message is received by the Jetstream subscriber.
Actual result:
Jetstream subscriber is not reconnected when NATS reconnects.
Is this a single machine? A memory stream is gone if the server goes down. Also the ephemeral might be lost. The client will read what's in the incoming memory buffer, but won't be able to ack it.
@sspates-starbucks Can you respond to @scottf question? An ephemeral consumer would need to be recreated if the server it lives on (memory only) goes down or if the client is disconnected for a long enough time that the server harvests the ephemeral consumer. Likewise, an R1 memory-store stream goes away if the server it lives on is brought down.
@scottf The issue repros both on a single device and with two different devices (client/server). The issue seems to be that when the client disconnects for any reason and then reconnects, the handlers don't get reconnected to the new Jetstream instance that get initiated for the reconnect. I was able to create work around code by just re-establishing the handlers once the client reconnects, but without the work around, any new messages sent don't get received.
@sspates-starbucks Can you clarify what you mean by "new Jetstream instance"? The intent is that the extant JetStream context and bindings should see the refreshed NATS connection and attempt to continue to work.
We see in your test code that the test stream is memory-resident. Would be better as file to dis-ambiguate that you are not killing the server (where the stream lives only in memory) to initiate the disconnect/reconnect cycle.
In addition, your example also shows an ephemeral JS Consumer. A JS Consumer will expire and be expunged in approx. 5 seconds if the server sees no client subscription interest.
A durable consumer would be a better test example to eliminate ambiguity of potentially trying to reference a non-existing JS Consumer (i.e. one that has been deleted by the server).
@scottf the same code with durable queues and/or a file based stream still exhibit the same issues.
Any updates guys?
@TodorBalabanski Is there a specific situation that you can describe having this problem? What is the topology? What are the stream and consumer configurations? Some situations it is expected not to reconnect and some are. Do you use the event listeners DisconnectedEventHandler / ReconnectedEventHandler / AsyncErrorEventHandler ?
@scottf @tbeets I updated to make the connection durable and stream connected. Neither one changes the result.
Any resolution here?
The latest release should take care of this, but it depends on exactly your type of consumer. Only durable consumers or ephemeral consumers with a inactive threshold that is longer than the time it takes to reconnect. Ephemerals also are unlikely to recover if they were memory based and not replicated (the default) or if the stream or consumer leader server was the one that came down.
This applies to push and pull consumers. Ordered consumers, even though they are ephemeral can recover because their state is kept on the client and can be reconstructed.
This work was done in this PR: https://github.com/nats-io/nats.net/pull/865