nats.net icon indicating copy to clipboard operation
nats.net copied to clipboard

NotSupportedException in Connection.publish getting BufferedStream.Position

Open DavidSimner opened this issue 2 years ago • 5 comments

We are seeing a System.NotSupportedException with the following stack trace:

   at System.IO.__Error.SeekNotSupported()
   at System.IO.BufferedStream.get_Position()
   at NATS.Client.Connection.publish(String subject, String reply, Byte[] headers, Byte[] data, Int32 offset, Int32 count, Boolean flushBuffer)
   at NATS.Client.Connection.<requestAsync>d__141.MoveNext()

It happens intermittently and isn't 100% reproducible, but here is my best guess as to what is happening:

In the C# class Connection there is a field private Stream bw which is a BufferedStream over another underlying Stream.

In the method publish if the status is ConnState.RECONNECTING then the getter for the property bw.Position is called. This getter is only supported if the underlying Stream supports seeking. As we can see from the stack trace above, in that case, the underlying stream does not support seeking, and this causes a NotSupportedException to be thrown.

Here is my best guess as to what happened:

When the connection first enters ConnState.RECONNECTING, bw is created over a MemoryStream called pending which does support seeking and so is not the cause.

However, later on, the method doReconnect calls the method createConn which changes the field bw to now be created over a stream associated with the TCP socket (which does not support seeking). After createConn has returned successfully, doReconnect then calls processConnectInit. As a thought experiment, consider what happens if an exception is thrown by processConnectInit: it will be caught in the catch block in doReconnect, and the status will be set to ConnState.RECONNECTING but it will not change bw to be over a MemoryStream and so bw will continue to have an underlying stream associated with the TCP socket (which does not support seeking). The catch block will then continue and it will then select the next server and release the mu lock before it calls the ReconnectDelayHandler. At this point imagine that another thread calls publish. publish will throw the stack trace above, because mu is not locked, and the status is ConnState.RECONNECTING, and the underlying stream of bw does not support seeking.

I think the fix is that in every place where the status is set to be ConnState.RECONNECTING, bw should be reset to be created over the MemoryStream called pending

DavidSimner avatar Apr 01 '22 12:04 DavidSimner

I also had this issue, can you please keep me posted here?

pcarcereri avatar Aug 15 '22 09:08 pcarcereri

I've come across this issue, and it is consistently repeatable in my environment with a single NATS server. My expectation was that this buffer would be present and used even if the connection is out for a period of time and isn't just when handling switching between servers.

@DavidSimner I think you are definitely on the right track and seems to be a race condition. The moment I put break points in, the publishing thread is updated with the "correct" updated bw BufferedStream. Since there isn't a second NATS server to connect to in my test environment, the createConn method never changes bw. Unless I'm missing somewhere else that bw would get reset to a stream on the tcp connection.

It's repeatable across multiple .Net and NATS.Client packages on my machine. I haven't gotten familiar enough with the source to know what would be a good fix but I'm willing to help test if anyone wants to look at this.

rlabbe82 avatar Oct 05 '23 03:10 rlabbe82

@rlabbe82 Any chance you have some pseudo / code I could use to repeat this? What version of the clients and service are you using? You said it's repeatable across clients? Which client and also do you have any code I could pass on to the client dev team?

scottf avatar Oct 05 '23 13:10 scottf

@scottf Yeah I do. After writing this in I think I figured out what is happening. @DavidSimner's thought experiment is what I am hitting. In the test I'm stopping the local container that is running the NATS server. This has the server close the connection and forces the client to go into the reconnection process.

When the reconnection thread runs and calls createConn I expected that the conn.open method would fail, however it was not. So I monitored the socket with wireshark and found that the reconnection attempt at the TCP level was initially successful, then FINed immediately after. Likely a sign that the docker container hasn't completely closed the socket even though the NATS server started exiting.

image

image

This then replaces the bw stream with the TCP NetworkStream and successfully completes createConn. However the next step, processConnectInit fails, since the socket was immediately closed, which in turn has it catch into changing the status back to reconnecting without updating bw back to the pending memory stream.

image

Seems to be a corner case but I could see it popping up sporadically with network layers like Docker that may not have completely released the external socket and allows for the reconnection to initially be successful. As a quick test I added a line to replace bw with the pending memory stream and that seemed to allow the connection to properly manage the buffer and retry without hitting this again.

Here is the program that I just put together to test with.

using System;
using System.Text;
using System.Threading;

namespace NatsTest
{
    internal class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory cf = new ConnectionFactory();

            // Creates a live connection to the default
            // NATS Server running locally
            IConnection c = cf.CreateConnection();


            // Publish requests to the given reply subject:
            for (int i = 0; i < 1000; i++)
            {
                c.Publish("foo", "bar", Encoding.UTF8.GetBytes("help!"));
                Thread.Sleep(1000);
            }


            // Draining and closing a connection
            c.Drain();

            // Closing a connection
            c.Close();
        }
    }
}

Found with NATS.Client 1.07, checked on 1.1.1 and rolled back to 0.11.0 as a quick test. NATS server 2.9.19 and 2.10.1 Running this with a local instance, not container, of Nats server does not produce the issue, likely because the socket control isn't through a virtualized network layer.

rlabbe82 avatar Oct 05 '23 14:10 rlabbe82

Thanks for the input, that's an interesting observation about when the server is in a container.

scottf avatar Oct 05 '23 15:10 scottf

@scottf Also seeing this bug in our logs (the broker was restarted by our update utility on an embedded Yocto system)

Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]: Unhandled exception. System.NotSupportedException: NotSupported_UnseekableStream
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at System.IO.BufferedStream.get_Position()
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at NATS.Client.Connection.PublishImpl(String subject, String reply, MsgHeader inHeaders, Byte[] data, Int32 offset, Nullable`1 inCount, Boolean flushBuffer)
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at NATS.Client.Connection.Publish(String subject, Byte[] data)
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at xxx.Broker.Nats.EncodedConnection.PublishAsync[T](String subject, T obj) in /builds/Riwo/customers/xxx/mfp-software/xxx/xxx.Broker.Nats/EncodedConnection.cs:line 32
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at xxx.Broker.Nats.EncodedConnectionProxy.PublishAsync[T](String subject, T obj) in /builds/Riwo/customers/xxx/mfp-software/xxx/xxx.Broker.Nats/EncodedConnectionProxy.cs:line 47
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at xxx.Sync.Service.SyncService.HandleStatusMessagesAsync(CancellationToken stoppingToken) in /builds/Riwo/customers/xxx/mfp-software/xxx/xxx.Sync/Service/SyncService.cs:line 49
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at System.Threading.ThreadPoolWorkQueue.Dispatch()
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:    at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]: fail: Microsoft.Extensions.Hosting.Internal.Host[9]
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:       BackgroundService failed
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:       System.NotSupportedException: NotSupported_UnseekableStream
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:          at System.IO.BufferedStream.get_Position()
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:          at NATS.Client.Connection.PublishImpl(String subject, String reply, MsgHeader inHeaders, Byte[] data, Int32 offset, Nullable`1 inCount, Boolean flushBuffer)
Mar 13 12:19:41 colibri-imx8x-07202716 XXX.API[636]:          at NATS.Client.Connection.Publish(String subject, Byte[] data)

jormenjanssen avatar Mar 15 '24 07:03 jormenjanssen

@scottf Also seeing this bug in our logs (the broker was restarted by our update utility on an embedded Yocto system) Side note, can we avoid "broker" and use "server" or "NATS server"

So the server running in the container was restarted? Or was the entire container restarted. Probably doesn't matter, it's the client that is having this error.

Since this is during a synchronous publish, it seems reasonable that your just handle the exception and wait for the reconnect to resume publishing. (I have a really good example of this in Java if you are interested.)

I think the most important question is does the connection go into reconnect logic and recover?

scottf avatar Mar 19 '24 13:03 scottf

In my case it would never recover.

rlabbe82 avatar Mar 28 '24 00:03 rlabbe82

Sounds like a very similar issue I ran into. I can reliably reproduce this when connecting to Synadia Cloud. The entire Program.cs is below. When delay is 0 it works. Delay a few seconds or more throws.

Notably the library (not the repro code) writes to console every second or so (in case the delay is greater than a second) reconnect events:

DisconnectedEvent, Connection: 48457
ReconnectedEvent, Connection: 94424
DisconnectedEvent, Connection: 48459

Exception:

System.NotSupportedException: Stream does not support seeking.
   at System.IO.BufferedStream.get_Position()
   at NATS.Client.Connection.PublishImpl(String subject, String reply, MsgHeader inHeaders, Byte[] data, Int32 offset, Nullable`1 inCount, Boolean flushBuffer)
   at NATS.Client.Connection.Publish(String subject, Byte[] data)
   at Program.<Main>$(String[] args) in Program.cs:line 8
   at Program.<Main>(String[] args)

Program.cs:

using System.Text;
using NATS.Client;

var path = @"path\to\cloud.creds";
var delay = 2000;
var con = new ConnectionFactory().CreateConnection("tls://connect.ngs.global", path);
await Task.Delay(delay);
con.Publish("test.subject", Encoding.UTF8.GetBytes("message"));
con.Close();

Using net7.0 and NATS.Client 1.1.4

Kaarel avatar Apr 19 '24 13:04 Kaarel

Hi @scottf , any thoughts on this? Happy to create a new issue if it's not exactly the same problem. But as it stands, we are evaluating Synadia Cloud, and the client just keeps disconnecting. Effectively we can't publish messages apart from immediately after initial connection.

kaarel-sm avatar Apr 24 '24 07:04 kaarel-sm

Can you please share more information on your outgoing networking. Does this happen with non encoded connection? Does this happen when you are hitting a local dev server? Any vpn or proxy in between? You get the idea. I need more details here to be able to help. Do you have handlers in place that can give more information? ClosedEventHandler, DisconnectedEventHandler, ReconnectedEventHandler I wonder if the tls upgrade is failing which might explain why the publish works with no delay, but not after some delay, but that's just a guess.

scottf avatar Apr 24 '24 16:04 scottf

Just a dev laptop, no fancy proxies. When using nats.net.v2 client the same scenario works as expected. We just have a lot invested into v1 client so the switch isn't trivial. We are currently evaluating if it would be feasible to migrate from on-prem NATS cluster to Synadia Cloud.

Literally the below 3 lines always throws for me on Publish when using v1 client (File - New - Project - Program.cs):

using var con = new ConnectionFactory().CreateConnection("tls://connect.ngs.global", @"path\to\synadia-cloud.creds");
Task.Delay(2000).Wait();
con.Publish("subject", System.Text.Encoding.UTF8.GetBytes("message"));

Similar functionality using v2 client works as expected on the same machine:

var opts = new NatsOpts
{
    Url = "tls://connect.ngs.global",
    AuthOpts = new NatsAuthOpts
    {
        CredsFile = @"path\to\synadia-cloud.creds"
    }
};
await using var con = new NatsConnection(opts);
await con.ConnectAsync();
await Task.Delay(2000);
await con.PublishAsync("subject", System.Text.Encoding.UTF8.GetBytes("message"));

Kaarel avatar Apr 25 '24 08:04 Kaarel

This is being addressed in https://github.com/nats-io/nats.net/pull/888

scottf avatar Apr 30 '24 15:04 scottf

@scottf this looks promising. Any ETA on NuGet package release?

Kaarel avatar May 07 '24 19:05 Kaarel

@Kaarel I'm doing the release right now! 1.1.5

scottf avatar May 08 '24 00:05 scottf

Can confirm the issue is fixed in 1.1.5 :) Thank you!

Kaarel avatar May 08 '24 18:05 Kaarel