nats.net icon indicating copy to clipboard operation
nats.net copied to clipboard

C# Can Drop Large Message with no error indication

Open wilstoff opened this issue 3 years ago • 4 comments

When trying to load test C# clients with one large message ~50MB, the client has the potential to get a slow consumer warning on the server, and the client auto reconnect, but the message is dropped and no error is presented to the user. If you turn off auto connected, you see the error is in the Conn.cs readLoop function in "server closed the connection." which correlates to this log on the server side.

Slow Consumer Detected: WriteDeadline of 5s exceeded with 2 chunks of 50000025 total bytes

While i understand this is hitting up against our WriteDeadline setting on the server, (i'm running on vpn and wifi so i understand the slowness inherent in my setup), and i understand that Nats is only guaranteeing at most once delivery, the fact is that no error is notified to the consumer that it is a slow consumer is concerning. I can understand this race condition can be difficult, as i've tried on different machines and i get slow consumer errors from the client sometimes, but it seems whenever we hit the WriteDeadline limit of the server it looks as if it drops a message with no error. I believe if the server forcibly closed the connection while writing and it wasn't because of a request from the client, then it should be bubbled to client as an error even if the determination of why is not clear.

Below is the testing code and server side configurations,

    [TestFixture]
    [Category("IntegrationTest")]
    public class PubSubIntegrationTests
    { 
        [Test]
        public async Task PubSub50MB()
        {
            int size = 50000000;
            var messages = new ConcurrentQueue<Msg>();
            var options = ConnectionFactory.GetDefaultOptions();
            options.Servers = new[] {"aaaaaa1:4222", "aaaaaa2:4222", "aaaaaa3:4222"};
            options.AsyncErrorEventHandler += (s, e) => Console.WriteLine($"Error: {e?.Error}, {e?.Conn?.LastError}");
            options.DisconnectedEventHandler += (s, args) => Console.WriteLine("Connection Disconnected");
            options.ReconnectedEventHandler += (s, args) => Console.WriteLine("Connection Reconnected");

            var publish = new ConnectionFactory().CreateConnection(options);
            var subscribe = new ConnectionFactory().CreateConnection(options);
            var subscription = subscribe.SubscribeAsync("abc123", (sender, args) =>
            {
                messages.Enqueue(args.Message);
            });
            await WaitForSubToHear("abc123", messages);

            publish.Publish("abc123", new byte[size]);
            await TestHelpers.WaitForCount(messages, 1, TimeSpan.FromSeconds(30));

            Assert.That(messages.Count, Is.EqualTo(1));
            Assert.That(messages.First().Data.Length, Is.EqualTo(size));
            subscription.Dispose();
        }

        public async Task WaitForSubToHear(string topic, ConcurrentQueue<Msg> collection, TimeSpan timeout = default)
        {
            timeout = timeout == default ? TimeSpan.FromSeconds(1) : timeout;
            var cancelToken = new CancellationTokenSource();
            var guid = Guid.NewGuid();
            int totalPublished = 0;
            await Task.WhenAny(Task.Run(async () =>
            {
                int i = 0;
                while (collection.Count < 1)
                {
                    ++i;
                    _pub.Publish(topic, Encoding.ASCII.GetBytes(guid + "_" + i));
                    await Task.Delay(10);
                }
                totalPublished = i;
            }, cancelToken.Token), Task.Delay(timeout));
            cancelToken.Cancel();
            cancelToken = new CancellationTokenSource();
            bool hasFinished = false;
            await Task.WhenAny(Task.Run(async () =>
            {
                while (Encoding.ASCII.GetString(collection.Last().Data) != guid + "_" + totalPublished)
                {
                    await Task.Delay(10);
                }

                collection.Clear();
                hasFinished = true;
            }, cancelToken.Token), Task.Delay(timeout));
            cancelToken.Cancel();
            Assert.That(hasFinished, Is.True, "HasFailedToSyncUpSub");
        }
    }

    public static class TestHelpers
    {
        public static async Task WaitForCount(ICollection collection, int countToWaitFor, TimeSpan timeout = default)
        {
            var now = DateTime.UtcNow;
            timeout = timeout == default ? TimeSpan.FromSeconds(1) : timeout;
            var cancelToken = new CancellationTokenSource();
            await Task.WhenAny(Task.Run(() =>
            {
                while (collection.Count < countToWaitFor) ;
            }, cancelToken.Token), Task.Delay(timeout));
            cancelToken.Cancel();
            var now2 = DateTime.UtcNow;
            Console.WriteLine(now2 - now);
        }
    }

OUTPUT

  Expected: 1
  But was:  0

   at BT.Messaging.IntegrationTests.PubSubIntegrationTests.PubSub50MB() in C:\git\Harbor\NetCore\Source\Messaging\Messaging.IntegrationTests\PubSubIntegrationTests.cs:line 250


Connection Disconnected
Connection Reconnected
00:00:29.9996175

1 Server in cluster of 3

{
  "server_id": "NBHGFWWXXIYI53RPHRTW4JSYWUUTY6MSQBSF4IP5QISDFF6ETXJCLRSQ",
  "version": "2.0.4",
  "proto": 1,
  "git_commit": "c8ca58e",
  "go": "go1.11.13",
  "host": "----------",
  "port": 4222,
  "connect_urls": [
    "---------:4222",
    "---------:4222",
    "---------:4222"
  ],
  "max_connections": 65536,
  "ping_interval": 120000000000,
  "ping_max": 2,
  "http_host": "-----------",
  "http_port": 8222,
  "https_port": 0,
  "auth_timeout": 1,
  "max_control_line": 4096,
  "max_payload": 268435456,
  "max_pending": 536870912,
  "cluster": {
    "addr": "-------------",
    "cluster_port": 5222,
    "auth_timeout": 1,
    "urls": [
      "aaaaaa1:5222",
      "aaaaaa2:5222"
    ]
  },
  "gateway": {},
  "leaf": {},
  "tls_timeout": 0.5,
  "write_deadline": 5000000000,
  "start": "2020-07-15T15:21:41.975356083-05:00",
  "now": "2020-07-17T13:05:01.798982281-05:00",
  "uptime": "1d21h43m19s",
  "mem": 43909120,
  "cores": 4,
  "cpu": 0,
  "connections": 6,
  "total_connections": 312,
  "routes": 2,
  "remotes": 2,
  "leafnodes": 0,
  "in_msgs": 3288000,
  "out_msgs": 3288168,
  "in_bytes": 7271432318,
  "out_bytes": 6197704740,
  "slow_consumers": 3,
  "subscriptions": 29,
  "http_req_stats": {
    "/": 2,
    "/connz": 32922,
    "/gatewayz": 32921,
    "/routez": 32921,
    "/subsz": 32921,
    "/varz": 32925
  },
  "config_load_time": "2020-07-15T15:21:41.975356083-05:00"
}

wilstoff avatar Jul 17 '20 18:07 wilstoff

@wilstoff does the error in the args to DisconnectedEventHandler contain what you are looking for?

options.DisconnectedEventHandler += (s, args) => Console.WriteLine("Connection Disconnected ");

the event handler argument args can contain an Exception that is accessible via Error. args.Error

matthiashanel avatar Jul 17 '20 21:07 matthiashanel

If you add args.Error or args.Conn.LastEx both are null. I believe this is because of the function here in Conn.cs here throws away any exception tracking that could be done if it is possible for it to reconnect via the processReconnect call:

        private void processOpError(Exception e)
        {
            bool disconnected = false;

            lock (mu)
            {
                if (isConnecting() || isClosed() || isReconnecting())
                {
                    return;
                }

                if (Opts.AllowReconnect && status == ConnState.CONNECTED)
                {
                    processReconnect();
                }
                else
                {
                    processDisconnect();
                    disconnected = true;
                    lastEx = e;
                }
            }

            if (disconnected)
            {
                Close();
            }
        }

If you instead turn off reconnection, the last error stored comes from the readLoop function below and is the "Server closed the connection" exception which also has no information as to why. I believe this is because if we are in the middle of reading and a client requests disconnection then it is ok because it is being brought down, and you can just throw away any partially processed messages. In the case of a forceful server disconnection, we may need a way to detect it as a slow consumer or potential slow consumer error while we are processing a large message as below.

        private void readLoop()
        {
            // Stack based buffer.
            byte[] buffer = new byte[Defaults.defaultReadLength];
            var parser = new Parser(this);
            int len;

            while (true)
            {
                try
                {
                    len = br.Read(buffer, 0, Defaults.defaultReadLength);

                    // A length of zero can mean that the socket was closed
                    // locally by the application (Close) or the server
                    // gracefully closed the socket.  There are some cases
                    // on windows where a server could take an exit path that
                    // gracefully closes sockets.  Throw an exception so we
                    // can reconnect.  If the network stream has been closed
                    // by the client, processOpError will do the right thing
                    // (nothing).
                    if (len == 0)
                    {
                        if (disposedValue || State == ConnState.CLOSED)
                            break;

                        throw new NATSConnectionException("Server closed the connection.");
                    }

                    parser.parse(buffer, len);
                }
                catch (Exception e)
                {
                    if (State != ConnState.CLOSED)
                    {
                        processOpError(e);
                    }

                    break;
                }
            }
        }

While an exception within the DisconnectedEventHandler would be better than nothing, the real thing is that i believe this is an error and should raise the AsyncErrorEventHandler. I understand that may be difficult, because you are kicking off a client because they couldn't write to their buffer in a reasonable amount of time, so any message from the server along the same channel will also be dropped. I don't know if this can be solved without splitting into an admin message socket vs a normal message socket, even then it is isn't fully solvable. Having a separate admin message socket at least lessens the chance of a bad subscription from slowing down processing admin messages.

wilstoff avatar Jul 18 '20 01:07 wilstoff

@wilstoff, when the server closes the client as a slow consumer, the client may not be able to detect that before it processes the socket close. Because NATS does not have a close protocol message, this error may be missed by the client. This would apply to all NATS clients.

You can detect this situation earlier by tuning the pending buffer of a subscriber (See NATS.Client.Options.SubChannelLength) to alert the application before the server would disconnect the client.

Stepping back, I'd suggest chunking the data and sending using a request/reply pattern to send the file over piece by piece. This will inherently rate limit your application to provide optimal throughput while mitigating the risk of overrunning the subscriber. Using smaller chunk sizes will allow you to transfer very large blocks of data across low bandwidth network links.

I'd suggest determining a chunk count before sending, and sending a message to prepare the subscribing app with the # of chunks to expect. The subscriber then receives the chunks and pieces them together.

ColinSullivan1 avatar Jul 19 '20 18:07 ColinSullivan1

Thats a good suggestion i knew there were some settings that would help detect this. I think the only thing that is wrong is the fact that the client doesn't present an error to the user when autoreconnect is on. It clearly has an error it just may not understand why besides the server disconnected it. We have since reduced our max payload to 10MB to avoid this issue for now, since we don't think it is a good design to naively send a large message like this over pub sub. Maybe with req/reply we will revisit, but would definitely not want to have to build a chunking solution ourselves. We currently are sending 3-4MB protobuf messages through another messaging system and hopefully future scaling don't bloat that message.

wilstoff avatar Jul 19 '20 19:07 wilstoff

Summary: Smaller messages will be more robust. Consider using ObjectStore to put large amounts of data in a stream. in combination with a signal message after the object has been stored to let the receiver know the large data is available and what bucket/name it is stored under.

scottf avatar Nov 16 '22 18:11 scottf