stan.net icon indicating copy to clipboard operation
stan.net copied to clipboard

Stan Client deployed in Kubernetes injected using DI is getting an error clientID already registered during load testing

Open rampratapa opened this issue 3 years ago • 16 comments

Hi All, good evening, I am trying to test STAN client in .Net Core deployed into Kubernetes as Pod. The following are the stand options set

  StanOptions cOpts = StanOptions.GetDefaultOptions();
            var random = new Random().Next(1, 100);
            cOpts.NatsURL = natUrl;
            var clusterID = "stan";
// the following is to make the unique client while scaling the pods.
            var clientID = "proxy-" + random.ToString();
            cOpts.PingInterval = 1000;
            cOpts.PingMaxOutstanding = 3;
           cOpts.ConnectionLostEventHandler = HandleStanConnection;

Injected the Stan Connection using DI

  services.AddTransient<IStanConnection>((s) => {
                  return new  StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
              });

I am seeing the an error during a load test

STAN.Client.StanConnectRequestException: stan: clientID already registered
   at STAN.Client.Connection..ctor(String stanClusterID, String clientID, StanOptions options)
   at STAN.Client.StanConnectionFactory.CreateConnection(String clusterID, String clientID, StanOptions options)
   at AksEligibilityNetCorePOC.Startup.<>c__DisplayClass4_0.<ConfigureServices>b__3(IServiceProvider s) in /src/Services/ProxyService/Startup.cs:line 59
   at ResolveService(ILEmitResolverBuilderRuntimeContext , ServiceProviderEngineScope )
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ActivatorUtilities.GetService(IServiceProvider sp, Type type, Type requiredBy, Boolean isDefaultParameterRequired)
   at lambda_method(Closure , IServiceProvider , Object[] )
   at Microsoft.AspNetCore.Mvc.Controllers.ControllerActivatorProvider.<>c__DisplayClass4_0.<CreateActivator>b__0(ControllerContext controllerContext)
   at Microsoft.AspNetCore.Mvc.Controllers.ControllerFactoryProvider.<>c__DisplayClass5_0.<CreateControllerFactory>g__CreateController|0(ControllerContext controllerContext)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.InvokeInnerFilterAsync()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeNextResourceFilter>g__Awaited|24_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.Rethrow(ResourceExecutedContextSealed context)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.InvokeFilterPipelineAsync()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
   at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequests[TContext](IHttpApplication`1 application)

How to handle this case gracefully or do we have issue here? Do we need to handle this in ConnectionLostEventHandler but I am not seeing this error as I put the debug statement. Also, is there anyway we can enable debug to understand the connection issue.

Is this issue related to https://github.com/nats-io/stan.net/issues/142

I am made change to the following line

  services.AddSingleton<IStanConnection>((s) => {
                  return new  StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
              });

and now I am seeing the following error intermittently and also seeing stan: clientID already registered

STAN.Client.StanConnectionException: Invalid connection.
 ---> NATS.Client.NATSNoServersException: Unable to connect to a server.
   at NATS.Client.Connection.connect()
   at NATS.Client.ConnectionFactory.CreateConnection(Options opts)
   at NATS.Client.ConnectionFactory.CreateConnection(String url)
   at STAN.Client.Connection..ctor(String stanClusterID, String clientID, StanOptions options)
   --- End of inner exception stack trace ---
   at STAN.Client.Connection..ctor(String stanClusterID, String clientID, StanOptions options)
   at STAN.Client.StanConnectionFactory.CreateConnection(String clusterID, String clientID, StanOptions options)
   at AksEligibilityNetCorePOC.Startup.<>c__DisplayClass4_0.<ConfigureServices>b__3(IServiceProvider s) in /src/Services/ProxyService/Startup.cs:line 60
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitCache(ServiceCallSite callSite, RuntimeResolverContext context, ServiceProviderEngineScope serviceProviderEngine, RuntimeResolverLock lockType)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite singletonCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.DynamicServiceProviderEngine.<>c__DisplayClass1_0.<RealizeService>b__0(ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngine.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ActivatorUtilities.GetService(IServiceProvider sp, Type type, Type requiredBy, Boolean isDefaultParameterRequired)
   at lambda_method(Closure , IServiceProvider , Object[] )
   at Microsoft.AspNetCore.Mvc.Controllers.ControllerActivatorProvider.<>c__DisplayClass4_0.<CreateActivator>b__0(ControllerContext controllerContext)
   at Microsoft.AspNetCore.Mvc.Controllers.ControllerFactoryProvider.<>c__DisplayClass5_0.<CreateControllerFactory>g__CreateController|0(ControllerContext controllerContext)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.InvokeInnerFilterAsync()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeNextResourceFilter>g__Awaited|24_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.Rethrow(ResourceExecutedContextSealed context)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.InvokeFilterPipelineAsync()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
   at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
   at Microsoft.AspNetCore.Routing.EndpointRoutingMiddleware.<Invoke>g__AwaitMatcher|8_0(EndpointRoutingMiddleware middleware, HttpContext httpContext, Task`1 matcherTask)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequests[TContext](IHttpApplication`1 application)

Please let me know if you need more information.

thank you

rampratapa avatar Sep 27 '20 23:09 rampratapa

Thank you for using NATS!

StanConnectRequestException: stan: clientID already registered

The error is originating at the server, and is likely caused by a client with the same ID not closing a connection (e.g. starting a connection with the same cluster ID before the previous connection was closed). This is most likely.

It's possible (but unlikely) you're trying to establish a new connection before the NATS streaming server has entirely processed the previous close. We can verify this on the NATS streaming server logs with the debug flag (-SD parameter), or in the config file (sd: true).

tbh, I don't think this issue is related to #142.

NATSNoServersException

The other exception you are seeing, NATS.Client.NATSNoServersException: Unable to connect to a server., indicates that the STAN client cannot create an underlying core NATS connection - so the server is down/unreachable. It's possible the underlying kubernetes service isn't responding. We've seen the k8s service drop out on us under load, however, it's more likely the server isn't available/running or can't respond in time.

One option to explore is creating an underlying NATS connection, increase the lower level connection timeout, and pass that to the STAN connection.

            var natsOptions = ConnectionFactory.GetDefaultOptions();
            natsOptions.Url = natUrl;
            natsOptions.Timeout = 10000;  // 10s, very generous

            StanOptions cOpts = StanOptions.GetDefaultOptions();
            cOpts.NatsConn = nc;  // assign the NATS conn

            // create NATS streaming connection, etc.

ColinSullivan1 avatar Sep 28 '20 03:09 ColinSullivan1

Good morning, Initially I was using the approach you suggested,

The following is to get the NATs options and created the NAT connection

private   Options GetOpts(string natUrl)
        {
            var opts = ConnectionFactory.GetDefaultOptions();
            opts.Url = natUrl; // "nats://localhost:4222";
            opts.AllowReconnect = true;
            opts.PingInterval = 5000;
            opts.MaxPingsOut = 4;
            opts.MaxReconnect = Options.ReconnectForever;

            opts.ServerDiscoveredEventHandler += (sender, args) => Console.WriteLine("NATS server discovered");

            opts.ReconnectedEventHandler +=
                (sender, args) => Console.WriteLine("NATS server in proxy reconnected.");
            opts.ClosedEventHandler +=
                (sender, args) => Console.WriteLine("NATS connection closed in proxy");
            opts.DisconnectedEventHandler += (sender, args) =>
                Console.WriteLine("NATS connection disconnected in proxy");
            opts.AsyncErrorEventHandler +=
                (sender, args) => Console.WriteLine("NATS async error: {0}, Message={1}, Subject={2}", args.Conn.ConnectedUrl,
                    args.Error, args.Subscription.Subject);

            return opts;
        }
       public   IConnection getConnection(string NatsUrl)
        {
            var cf = new ConnectionFactory();
            var natsConnection = cf.CreateConnection(GetOpts(NatsUrl));
            return natsConnection;
        }

The following value set in the stan options

var natConnection =  getConnection(natUrl);
      cOpts.NatsConn = natConnection;
    cOpts.PingInterval = 5000;
     cOpts.PingMaxOutstanding = 4;

The issue with this approach it is firing ConnectionLostEventHandler events and causing some time transaction to fail and other time it is taking too long to return the error, causing impact to the throughput.

one question is, when you receive this event ConnectionLostEventHandler, it prints the following error based on console.write

Console.WriteLine("NATS server in proxy reconnected.");

Do we need to write code in that block to reestablish StanConnection and its subscriptions. As I am seeing, when connection lost, the subscribers are not receiving the messages. I need to restart the pods to process those messages. Also, I noticed, consumers are slow.

Based on the following from the main document page

When no NATS connection is provided, the library creates its own NATS connection and will now set the reconnect attempts to "infinite", which was not the case before. It should therefore be possible for the library to always reconnect, but this does not mean that the streaming connection will not be closed, even if you set a very high threshold for the PINGs max out value. Keep in mind that while the client is disconnected, the server is sending heartbeats to the clients too, and when not getting any response, it will remove that client from its state. When the communication is restored, the PINGs sent to the server will allow to detect this condition and report to the client that the connection is now closed."

Based on the above, I felt that the best way is to use StandOptions and sets the NatsURL, so that Stan client will manage the same. However I will run the code the NAtConnection options and will publish the results soon.

My main goal is to find fast and reliable messaging engine that can fit into Kubernetes cluster.

Thank you for your quick response and your help.

Thanks Ram

rampratapa avatar Sep 28 '20 12:09 rampratapa

good afternoon, I have executed a performance test the following are the details

The Stan options set are:

              StanOptions cOpts = StanOptions.GetDefaultOptions();
            // cOpts.NatsURL = natUrl;
           var natConnection = getConnection(natUrl);
              cOpts.NatsConn = natConnection;
            cOpts.PingInterval = 10000;
            cOpts.PingMaxOutstanding = 5;
            cOpts.ConnectionLostEventHandler = HandleStanConnection;

The Nat options are:
   var opts = ConnectionFactory.GetDefaultOptions();
              opts.Url = natUrl; // "nats://localhost:4222";
              opts.AllowReconnect = true;
              opts.PingInterval = 10000;
              opts.MaxPingsOut = 5;
              opts.MaxReconnect = Options.ReconnectForever;

The following is from subscriber log file Tracker Lost Connection HandleStanConnection NATS connection disconnected in Traker NATS server in Tracker reconnected. NATS Starting Subscribers. Unable to restart tracker subscriptions stan: clientID already registered

the following error noticed in Stan-0 logs [1] 2020/09/28 20:25:44.280290 [ERR] STREAM: [Client:trkingservicenet-47] Timed out on heartbeats [1] 2020/09/28 20:43:13.333242 [ERR] STREAM: [Client:trkingservicenet-47] Timed out on heartbeats [1] 2020/09/28 21:11:40.625488 [ERR] STREAM: [Client:trkingservicenet-47] Timed out on heartbeats

Please let me know if you need more information.

Thanks ram

rampratapa avatar Sep 28 '20 21:09 rampratapa

Thanks Ram. It's looking like there was a long period of time when the network was unavailable - the STAN client timed out on missing heartbeats and the NATS client disconnected (assuming the NATS connection disconnected in Traker message is from a disconnected callback.

I have a few questions:

  • Are you cycling the server during these tests?
  • What is your application doing at the time NATS Starting Subscribers. is printed?
  • Would you be able to provide a simple test case that reproduces this behavior?

Thanks, Colin

ColinSullivan1 avatar Sep 30 '20 02:09 ColinSullivan1

Colin, Good evening, Thank you for your response. I have the following setup.

I have deployed the NATS and STAN in AKS using the following steps ( Though I wanted to setup a FT mode and I could not find steps to setup in Azure where as I was able to find AWS steps). https://docs.nats.io/nats-on-kubernetes/minimal-setup#ha-setup-using-statefulset : HA Setup Using StatefulSets

Create a client with .NET to publish message after injecting the client using DI

      private string publishMessage(string subject,string hubMessge, string hubId)
        {
            Stopwatch sw = Stopwatch.StartNew();
            _logger.LogInformation("Published Message for Tracking");
            var response = "<ack></ack>";
            long acksProcessed = 0;
            var payload = Encoding.UTF8.GetBytes((string)hubMessge);
            AutoResetEvent ev = new AutoResetEvent(false);
            string guid = _stanConnection.Publish(subject, payload, (obj, pubArgs) =>
            {
                response = "<ack>" + pubArgs.GUID + "</ack>";

                _logger.LogInformation("Recieved ack for message {0}", pubArgs.GUID);

                if (!string.IsNullOrEmpty(pubArgs.Error))
                {
                    _logger.LogInformation("Error processing message {0}", pubArgs.GUID);
                    response = "<nack>" + pubArgs.GUID + "</nack>";
                }
                if (Interlocked.Increment(ref acksProcessed) == 1)
                    ev.Set();

            });
            sw.Stop();
            Console.WriteLine("Metrics:publishMessage Request {0}:{1}ms", hubId, sw.Elapsed.TotalMilliseconds);
            ev.WaitOne();
            return response;
        }

I have create a console application for subscribers.

The following is startup class

  public static class Startup
   {

       private static readonly IConfiguration configuration;
       public static  IServiceProvider provider;
       private static readonly ServiceCollection services;
       private static readonly string clusterID;
       private static readonly string clientID;
       static Startup()
       {
           var random = new Random().Next(1, 100);
           clusterID = "stan";
           clientID = "trkingservicenet-" + random.ToString();
           var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");

           configuration = new ConfigurationBuilder()
                           .SetBasePath(Directory.GetCurrentDirectory())
                           .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
                           .AddJsonFile($"appsettings.{environment}.json", optional: true)
                           .AddEnvironmentVariables()
                           .Build();
           services = new ServiceCollection();
           var natUrl = configuration.GetValue<string>("NATsService");
           var TrackingboxSqlConnection = configuration.GetValue<string>("TrackingSqlConnectionString");
           StanOptions cOpts = StanOptions.GetDefaultOptions();
           // cOpts.NatsURL = natUrl;
          var natConnection = getConnection(natUrl);
             cOpts.NatsConn = natConnection;
           cOpts.PingInterval = 10000;
           cOpts.PingMaxOutstanding = 5;
           cOpts.ConnectionLostEventHandler = HandleStanConnection;
           // add necessary services
           services.AddSingleton(configuration);
           services.AddSingleton<ITracker, TrackingWorker>();
           services.AddSingleton<IStanConnection>((s) =>
           {
               return new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
           });
           //   services.AddLogging();

           // build the pipeline
           services.AddDbContext<TrackingContext>(
      options => options.UseSqlServer(TrackingboxSqlConnection, providerOptions => providerOptions.EnableRetryOnFailure(maxRetryCount: 10,
      maxRetryDelay: TimeSpan.FromSeconds(5),
      errorNumbersToAdd: null)));
           services.AddDbContext<TrackingContext>(
                       c => c.UseQueryTrackingBehavior(QueryTrackingBehavior.NoTracking));
           services.AddScoped<ITrackingRepository, TrackingRepository>();

           //   var observer = new ApplicationInsightsKubernetesDiagnosticObserver(DiagnosticLogLevel.Trace);
           //   ApplicationInsightsKubernetesDiagnosticSource.Instance.Observable.SubscribeWithAdapter(observer);
           //  services.AddApplicationInsightsTelemetry();

           services.AddApplicationInsightsKubernetesEnricher();
           services.AddLogging();
           /*   services.AddLogging(builder =>
              {
                  // Optional: Apply filters to configure LogLevel Trace or above is sent to
                  // Application Insights for all categories.
                  builder.AddFilter<Microsoft.Extensions.Logging.ApplicationInsights.ApplicationInsightsLoggerProvider>("", LogLevel.Trace);
                  builder.AddApplicationInsights("");

                  // Optional: Show the logs in console at the same time
                  builder.AddConsole();
              });*/
           var telemetryConfiguration = TelemetryConfiguration.CreateDefault();
           telemetryConfiguration.InstrumentationKey = "";
           var telemetryClient = new TelemetryClient(telemetryConfiguration);
           services.AddSingleton(telemetryClient);
           provider = services.BuildServiceProvider();
       }
       public static void StartMe()
       {
           Console.WriteLine("Initializing");
       }

       public static IConnection getConnection(string NatsUrl)
      {
          var cf = new ConnectionFactory();
          var natsConnection = cf.CreateConnection(GetOpts(NatsUrl));
          return natsConnection;
      }

       private static Options GetOpts(string natUrl)
         {
             var opts = ConnectionFactory.GetDefaultOptions();
             opts.Url = natUrl; // "nats://localhost:4222";
             opts.AllowReconnect = true;
             opts.PingInterval = 10000;
             opts.MaxPingsOut = 5;
             opts.MaxReconnect = Options.ReconnectForever;
             // opts.ReconnectWait = 1000;
             //  opts.Timeout = 4000;

             opts.ServerDiscoveredEventHandler += (sender, args) => Console.WriteLine("NATS server discovered");

             opts.ReconnectedEventHandler = ReconnectedEventHandler;
             opts.ClosedEventHandler +=
                 (sender, args) => Console.WriteLine("NATS connection closed in Tracker");
             opts.DisconnectedEventHandler += (sender, args) =>
                 Console.WriteLine("NATS connection disconnected in proxy");
             opts.AsyncErrorEventHandler +=
                 (sender, args) => Console.WriteLine("NATS async error: {0}, Message={1}, Subject={2}", args.Conn.ConnectedUrl,
                     args.Error, args.Subscription.Subject);

             return opts;
         }

         static void ReconnectedEventHandler(object obj, ConnEventArgs args)
         {
             Console.WriteLine("NATS server in Tracker reconnected.");
             Console.WriteLine("NATS Starting Subscribers.");
             try
             {
                 var natConnection = args.Conn;
                 StanOptions cOpts = StanOptions.GetDefaultOptions();
                 cOpts.NatsConn = natConnection;
                 cOpts.PingInterval = 5000;
                 cOpts.PingMaxOutstanding = 4;
                 var sConnection = provider.GetRequiredService<IStanConnection>();
               services.AddSingleton<IStanConnection>((s) =>
                 {
                     return new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
                 });
                 provider = services.BuildServiceProvider();
                 var service = provider.GetRequiredService<ITracker>();
                 service.TrackMessage();
             }
             catch(Exception ex)
             {
                 Console.WriteLine("Unable to restart traker subscriptions  " + ex.Message);
             }

         }
       private static void HandleStanConnection(object obj, StanConnLostHandlerArgs args)
       {
           Console.WriteLine("Tracker Lost Connection HandleStanConnection");

       }

   }

The following is invoked from the program

   public class TrackingWorker : ITracker
   {
       private readonly ILogger<TrackingWorker> _logger;
       private readonly IStanConnection _stanConnection;
       private readonly ITrackingRepository _trackingRepo;
       private readonly string clusterID = "stan";
       private readonly string subscriberChannel = "trackmessagenet";
       private readonly string durableID = "tracking-service-durable";
       //event             = "track-message"
       private readonly string queueGroup = "eligtrackinggroup";
       private readonly TelemetryClient _telemetricClient;
       public TrackingWorker(ILogger<TrackingWorker> logger, IStanConnection stanConnection, ITrackingRepository trackingRepo, TelemetryClient telemetricClient)
       {
           _logger = logger;
           _stanConnection = stanConnection;
           _trackingRepo = trackingRepo;
           _telemetricClient = telemetricClient;
       }

       public Task<object> TrackMessage()
       {
           AppDomain.CurrentDomain.ProcessExit += (s, e) => FinalizeApplication();
           var opts = StanSubscriptionOptions.GetDefaultOptions();
         //  opts.StartWithLastReceived();
           opts.DurableName = durableID;
           opts.ManualAcks = true;
           EventHandler<StanMsgHandlerArgs> ackHandler = ProcessMessage;
           var s = _stanConnection.Subscribe(subscriberChannel, queueGroup,opts, ackHandler);
           return null;
       }
       private void ProcessMessage(object obj, StanMsgHandlerArgs args)
       {
           var channel = new InMemoryChannel();
           Console.WriteLine("started ProcessMessage{0}", args.Message.Sequence);
           Console.WriteLine("Message Redelivered{0}", args.Message.Redelivered);
           _logger.LogInformation("started ProcessMessage{0}", args.Message.Sequence);
           _logger.LogInformation(args.Message.Subject);
           _telemetricClient.TrackEvent("Message Received");

           var requetFromQueue = System.Text.Encoding.UTF8.GetString(args.Message.Data);
           try
           {

               var  trackMessage = JsonSerializer.Deserialize<TrackingMessage>(requetFromQueue);
               var eventType = trackMessage.Event;
               Console.WriteLine("Message Event{0}", eventType);
               switch (eventType)
               {
                   case MessageEventType.Tracking:
                       var task1 = Task.Run(async () => await _trackingRepo.InsertTracking(trackMessage, eventType, (args.Message.Sequence).ToString(), (args.Message.Redelivered).ToString()));
                       var result1 = task1.Result;
                        
                       break;
                   case MessageEventType.TrackingDetails:
                   //    Console.WriteLine("Message received{0}", requetFromQueue);
                       var task = Task.Run(async () => await _trackingRepo.InsertTrackingDetails(trackMessage, eventType, (args.Message.Sequence).ToString(), (args.Message.Redelivered).ToString()));
                       var result = task.Result;
                       
                       break;
                   case MessageEventType.HubLog:

                       break;
                   default:
                       break;

               }
               args.Message.Ack();
           }
           catch (SqlException sqlEx)
           {
               // Hope this will return the message back to EventGrid topic for reprocessing.
               _logger.LogInformation("SQL Exception Occurred in Tracking" + sqlEx.StackTrace);
               throw sqlEx;
           }
           catch (Exception ex)
           {
               _logger.LogError(ex, "Error Occurred");
           }


       }
       private   void FinalizeApplication()
       {
           // Give TelemetryClient 5 seconds to flush it's content to Application Insights
           _telemetricClient.Flush();
           Thread.Sleep(5000);
       }

   }

The publisher is in one docker container and the subscriber is in a different docker container and deployed into AKS cluster. I am thinking that I may need to add the following code in ReconnectedEventHandler and I am going to add and try to make unique client ID.

  var random = new Random().Next(1, 100);
                var clusterID = "stan";
                var clientID = "trkingservicenet-" + random.ToString();

However, I was wondering why the connection is dropping so frequently as NATS/STAN was deployed as high-availability mode in AKS with three pods each NATS-0, NATS-1, and NAT-2 and similarly we have pods STAN-0 to 2.

Please let me know if you need more information.

Thank you for your help. Thanks Ram

rampratapa avatar Sep 30 '20 03:09 rampratapa

Thank you for sharing this code. The reconnected event handler is for the lower level underlying NATS connection, and upon successful core NATS reconnection, you might still have a valid NATS streaming connection. I'd move the STAN reconnect code to the ConnectionLostEventHandler.

e.g.

StanOption.ConnectionLostEventHandler = (obj, args) =>
{
          Console.WriteLine("Lost connection to NATS Streaming.");
          Console.WriteLine("NATS Starting Subscribers.");
          try
          {
              var natConnection = args.Conn;
              StanOptions cOpts = StanOptions.GetDefaultOptions();
              cOpts.NatsConn = natConnection;
              cOpts.PingInterval = 5000;
              cOpts.PingMaxOutstanding = 4;
              var sConnection = provider.GetRequiredService<IStanConnection>();
            services.AddSingleton<IStanConnection>((s) =>
              {
                  return new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
              });
              provider = services.BuildServiceProvider();
              var service = provider.GetRequiredService<ITracker>();
              service.TrackMessage();
          }
          catch(Exception ex)
          {
              Console.WriteLine("Unable to restart traker subscriptions  " + ex.Message);
          }
};

Can we see the entire log of the streaming server when this happens?

@kozlovic, any other thoughts here?

@wallyqs, would you know of a FT setup of NATS streaming for Azure? Would the helm chart be better?

ColinSullivan1 avatar Oct 01 '20 21:10 ColinSullivan1

Colin, good evening, I made the changes and ran the load test and the subscribers are going down and they are not processing messages as it is not able to restart the subscribers, if I re-deploy the pods it start picking up the messages. It seems the ConnectionLostEventHandler runs only once. I am hoping args.Connection.NATSConnection will return the original NATs connection. If not then I need to find a suitable one or reset this one. I will put a debug statement tomorrow and validate or please let me know if this is wrong.

I noticed the following logging statements

  • NATS connection disconnected in proxy
  • NATS server in proxy reconnected.
  • NATS async error: nats://nats.default.svc.cluster.local:4222, Message=Slow Consumer, Subject=_INBOX.6ED2E06BOHOGIGVR4LEDOS The following three are from HandleStanConnection, I think it could not establish the connection and hence subscriber dead.

STAN Connection Lost Handler -- server in Tracker reconnected. STAN Starting Subscribers. Unable to restart tracker subscriptions Invalid connection.

Also, I noticed about 20% of messages have re-delivered status and hence they are duplicates. I will try to run tomorrow morning one more test and will publish the final results.

private static void HandleStanConnection(object obj, StanConnLostHandlerArgs args)
        {
            Console.WriteLine("STAN Connection Lost Handler -- server in Tracker trying to reconnect.");
            Console.WriteLine("STAN Starting Subscribers.");
            try
            {
                var natConnection = args.Connection.NATSConnection;
                StanOptions cOpts = StanOptions.GetDefaultOptions();
                cOpts.NatsConn = natConnection;
                cOpts.PingInterval = 10000;
                cOpts.PingMaxOutstanding = 5;
              //  var sConnection = provider.GetRequiredService<IStanConnection>();
                var random = new Random().Next(1, 100);
                var clusterID = "stan";
                var clientID = "trkingservicenet-" + random.ToString();
                services.AddSingleton<IStanConnection>((s) =>
                {
                    return new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts);
                });
                provider = services.BuildServiceProvider();
                var service = provider.GetRequiredService<ITracker>();
                service.TrackMessage();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Unable to restart tracker subscriptions  " + ex.Message);
            }

How do we retry the connection. Please let me know if you need any more information.

Thank you for your help.

Thanks Ram

rampratapa avatar Oct 02 '20 03:10 rampratapa

Colin, Good evening, I am enclosing the three pods stan log files. The client connection is not stable and it disconnects after processing around 30K messages and here are the log files from streaming server after enabling debugging.

Can we see the entire log of the streaming server when this happens

stan.txt stan1.txt stan2.txt

Please let me know if you need more information.

Thanks Ram

rampratapa avatar Oct 03 '20 02:10 rampratapa

Colin, good evening, here is the log from stan and my observation

The following is publisher 2020/10/04 03:00:34.108025 [TRC] STREAM: [Client:proxy-96] Received message from publisher subj=trackmessagenet guid=7YMN1B659T8O712ZCXT02H 2020/10/04 03:00:34.111173 [TRC] STREAM: [Client:proxy-96] Acking Publisher subj=trackmessagenet guid=7YMN1B659T8O712ZCXT02H

This is another publisher 2020/10/04 03:00:34.422305 [DBG] STREAM: [Client:Orchestrator-52] Connected (Inbox=_INBOX.UL4WMWCPGUBHDCUJ1OGZM9) 2020/10/04 03:00:36.495931 [TRC] STREAM: [Client:Orchestrator-52] Received message from publisher subj=trackmessagenet guid=UL4WMWCPGUBHDCUJ1OGZTD 2020/10/04 03:00:36.502145 [TRC] STREAM: [Client:Orchestrator-52] Acking Publisher subj=trackmessagenet guid=UL4WMWCPGUBHDCUJ1OGZTD This a durable subscriber 2020/10/04 03:01:18.157004 [ERR] STREAM: [Client:trkingservicenet-5] Timed out on heartbeats 2020/10/04 03:01:18.160362 [DBG] STREAM: [Client:trkingservicenet-5] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.A56VW3ELPEPRJE1BWZNTFE, queue=tracking-service-durable:eligtrackinggroup, subid=26 2020/10/04 03:01:18.160384 [DBG] STREAM: [Client:trkingservicenet-5] Closed (Inbox=_INBOX.A56VW3ELPEPRJE1BWZNT2K) 2020/10/04 03:01:18.588976 [ERR] STREAM: [Client:trkingservicenet-77] Timed out on heartbeats 2020/10/04 03:01:18.591760 [DBG] STREAM: [Client:trkingservicenet-77] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.IZRB3ZHNV0KY8HCVZCPB32, queue=tracking-service-durable:eligtrackinggroup, subid=27 2020/10/04 03:01:18.591776 [DBG] STREAM: [Client:trkingservicenet-77] Closed (Inbox=_INBOX.IZRB3ZHNV0KY8HCVZCPAZ6) 2020/10/04 03:01:18.670491 [ERR] STREAM: [Client:trkingservicenet-64] Timed out on heartbeats 2020/10/04 03:01:18.673567 [DBG] STREAM: [Client:trkingservicenet-64] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.HKUK8I2VM4TV3O8C65CELZ, queue=tracking-service-durable:eligtrackinggroup, subid=28 2020/10/04 03:01:18.673603 [DBG] STREAM: [Client:trkingservicenet-64] Closed (Inbox=_INBOX.HKUK8I2VM4TV3O8C65CEBZ) 2020/10/04 03:01:19.681294 [ERR] STREAM: [Client:trkingservicenet-27] Timed out on heartbeats 2020/10/04 03:01:19.683879 [DBG] STREAM: [Client:trkingservicenet-27] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.ZWS6WG8PFKYXUVMZAMSJIA, queue=tracking-service-durable:eligtrackinggroup, subid=30 2020/10/04 03:01:19.683901 [DBG] STREAM: [Client:trkingservicenet-27] Closed (Inbox=_INBOX.ZWS6WG8PFKYXUVMZAMSJ9G) 2020/10/04 03:01:20.378300 [ERR] STREAM: [Client:trkingservicenet-82] Timed out on heartbeats 2020/10/04 03:01:20.381044 [DBG] STREAM: [Client:trkingservicenet-82] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.HLKMZBB65TK2BVJGH6RADM, queue=tracking-service-durable:eligtrackinggroup, subid=31 2020/10/04 03:01:20.381078 [DBG] STREAM: [Client:trkingservicenet-82] Closed (Inbox=_INBOX.HLKMZBB65TK2BVJGH6RAB2) 2020/10/04 03:01:25.107908 [ERR] STREAM: [Client:trkingservicenet-6] Timed out on heartbeats 2020/10/04 03:01:25.110640 [DBG] STREAM: [Client:trkingservicenet-6] Removed member from durable queue subscription, subject=trackmessagenet, inbox=_INBOX.963XZ8N2XNVCOVI1Q5EMMZ, queue=tracking-service-durable:eligtrackinggroup, subid=33 2020/10/04 03:01:25.110667 [DBG] STREAM: [Client:trkingservicenet-6] Closed (Inbox=_INBOX.963XZ8N2XNVCOVI1Q5EMIV) :\NETCoreDeployment> once that trkingservicenet is closed,

The following is from Subscriber I am seeing the following exception at System.IO.BufferedStream.EnsureCanSeek() at System.IO.BufferedStream.get_Position() at NATS.Client.Connection.publish(String subject, String reply, Byte[] data, Int32 offset, Int32 count, Boolean flushBuffer) at NATS.Client.Connection.Publish(String subject, Byte[] data) at STAN.Client.AsyncSubscription.manualAck(StanMsg m) at TrackerService.**TrackingWorker.ProcessMessage(**Object obj, StanMsgHandlerArgs args) in /src/Services/Mdrx.Hub.Eligibility.TrackerService/TrackingWorker.cs:line 103

Also, noticing lot Redelivery though I issuing args.Message.Ack() as soon as I receive the message. Properties set

opts.ManualAcks = true; opts.AckWait = 60000;

[1] 2020/10/04 03:19:31.596582 [TRC] STREAM: [Client:trkingservicenet-96] Processing ack for subid=54, subject=trackmessagenet, seq=98444 [1] 2020/10/04 03:19:31.596674 [TRC] STREAM: [Client:trkingservicenet-96] Delivering msg to subid=54, subject=trackmessagenet, seq=104644 [1] 2020/10/04 03:19:31.613452 [TRC] STREAM: [Client:trkingservicenet-96] Redelivering msg to subid=54, subject=trackmessagenet, seq=99133 [1] 2020/10/04 03:19:31.613471 [TRC] STREAM: [Client:trkingservicenet-96] Redelivery for subid=54, skipping seq=103064 [1] 2020/10/04 03:19:31.620471 [TRC] STREAM: [Client:trkingservicenet-80] Redelivering msg to subid=56, subject=trackmessagenet, seq=99134 [1] 2020/10/04 03:19:31.620496 [TRC] STREAM: [Client:trkingservicenet-80] Redelivery for subid=56, skipping seq=103065 [1] 2020/10/04 03:19:31.624664 [TRC] STREAM: [Client:trkingservicenet-53] Processing ack for subid=58, subject=trackmessagenet, seq=98338 [1] 2020/10/04 03:19:31.624863 [TRC] STREAM: [Client:trkingservicenet-53] Delivering msg to subid=58, subject=trackmessagenet, seq=104645 [1] 2020/10/04 03:19:31.636387 [TRC] STREAM: [Client:trkingservicenet-96] Redelivering msg to subid=54, subject=trackmessagenet, seq=103064 [1] 2020/10/04 03:19:31.636823 [TRC] STREAM: [Client:trkingservicenet-96] Redelivery for subid=54, skipping seq=99142 [1] 2020/10/04 03:19:31.644505 [TRC] STREAM: [Client:trkingservicenet-92] Redelivering msg to subid=59, subject=trackmessagenet, seq=99136 [1] 2020/10/04 03:19:31.644728 [TRC] STREAM: [Client:trkingservicenet-92] Redelivery for subid=59, skipping seq=99143 [1] 2020/10/04 03:19:31.648904 [TRC] STREAM: [Client:trkingservicenet-35] Redelivering msg to subid=55, subject=trackmessagenet, seq=101685 [1] 2020/10/04 03:19:31.649121 [TRC] STREAM: [Client:trkingservicenet-35] Redelivery for subid=55, skipping seq=103066 [1] 2020/10/04 03:19:31.667335 [TRC] STREAM: [Client:trkingservicenet-80] Redelivering msg to subid=56, subject=trackmessagenet, seq=103065 [1] 2020/10/04 03:19:31.667566 [TRC] STREAM: [Client:trkingservicenet-80] Redelivery for subid=56, skipping seq=99139 [1] 2020/10/04 03:19:31.677009 [TRC] STREAM: [Client:trkingservicenet-57] Redelivering msg to subid=57, subject=trackmessagenet, seq=99137 [1] 2020/10/04 03:19:31.677237 [TRC] STREAM: [Client:trkingservicenet-57] Redelivery for subid=57, skipping seq=99140 [1] 2020/10/04 03:19:31.683092 [TRC] STREAM: [Client:trkingservicenet-80] Redelivering msg to subid=56, subject=trackmessagenet, seq=99139 [1] 2020/10/04 03:19:31.683295 [TRC] STREAM: [Client:trkingservicenet-80] Redelivery for subid=56, skipping seq=99144

After this point, even I redeploy the pods, there is no use and I have remove the NATS Streaming server and re-deploy it.

I will try to setup a FT mode and see if that works with .net client. I did not see this problem with Go client ( As part of my investigation I wrote the same functionality in GoLang) with current setup and the hardware but it may not work GoLang for our use case as we need to write the code in C#.

Thanks Ram

rampratapa avatar Oct 04 '20 03:10 rampratapa

Ram, thank you for the information. I'm concerned about:

at System.IO.BufferedStream.EnsureCanSeek()
at System.IO.BufferedStream.get_Position()
at NATS.Client.Connection.publish(String subject, String reply, Byte[] data, Int32 offset, Int32 count, Boolean flushBuffer)
at NATS.Client.Connection.Publish(String subject, Byte[] data)
at STAN.Client.AsyncSubscription.manualAck(StanMsg m)
at TrackerService.**TrackingWorker.ProcessMessage(**Object obj, StanMsgHandlerArgs args) in /src/Services/Mdrx.Hub.Eligibility.TrackerService/TrackingWorker.cs:line 103

That stems from a bug (https://github.com/nats-io/nats.net/pull/349) that we fixed in the NATS.Client version v0.10.1. This would cause resends if you aren't re-acknowledging the message after encountering this exception. Can you check the client versions of both STAN.Client and NATS.Client assemblies?

Also, looking through the logs, all of your clients time out on heartbeats after a new leader has been elected and the existing client replaced. There is definitely a lot going on here. I'll look further, and coordinate with the rest of the team tomorrow.

Also, I'm following https://github.com/nats-io/nats-streaming-server/issues/1092. We definitely suggest FT mode over HA (clustering) when possible.

Best regards, Colin

ColinSullivan1 avatar Oct 04 '20 16:10 ColinSullivan1

Colin, Good evening, Thank you for the quick response, I am using STAN.Client 0.2.1 which is referring NATS.client 0.10.0. <PackageReference Include="STAN.Client" Version="0.2.1" />

I will spend some more time tomorrow to correlate the events and let you know what I find.

I need to find good steps to setup NATS/STAN on AKS using either Azure blob storage or disk files as I wanted to keep it simple to maintain instead of SQL server.

Thanks ram

rampratapa avatar Oct 05 '20 00:10 rampratapa

@rampratapa I was asked to give some feedback.

  • Upgrade to latest NATS Streaming server: 0.18.0
  • You probably need to bump the streaming client PingInterval/PingMaxOutstanding (although I see that you did compared to the first post). You need those (interval*max) to cover a downtime and full restart of a leader to avoid the client considering the connection to the server lost.
  • From the 3 server log files, it is not really that there was a leader election in the middle of operations: those 3 servers have been restarted, so yes, a leader is elected
  • Timeout of some clients may indicate either that they don't have a ping/max high enough, that the server is configured with a small ping-to-client setting (if you did not set anything then that can't be it), or there is communication problem between the server(s) and your clients.
  • Redeliveries happen when a client does not ack a message. If the message callback takes a long time to return, it is possible that the server resends multiple copies of the same message. Note that in NATS, message callback are invoked serially (one message at a time). So suppose your app receives a message, ack it right away, but then takes say 1 minute to process the message. If the AckWait is, say 10 seconds, the second message that has been sent right away after the first will not be ack'ed for about 6 intervals. So server will have resent 6 times the second message. When your application's callback finally completes and acknowledges the second message, there are still several copies left to be dispatched. The library is not suppressing those duplicates.

I would personally have started with single NATS Streaming server instance and ensure that your app (pub and sub) are stable (no time out, able to handle traffic that you want, etc..). Then only move to cluster setup. There are too many things going on right now..

Finally, you may be aware that JetStream is our new streaming product and if you are starting a new project, you should have a look at JetStream instead (although not officially released yet, but available from the NATS Server main branch as a tech preview). Something that I feel important to always point to users that are new to NATS Streaming is the fact that NATS Streaming is not a typical message queue system. That is, messages are not removed from the channel when they have been acknowledged by applications, instead they are removed due to channel limits (https://docs.nats.io/nats-streaming-concepts/channels/message-log). JetStream will have other modes such as interest-based and work-queue that cause messages to be removed from the stream once acknowledged.

kozlovic avatar Oct 05 '20 19:10 kozlovic

@kozlovic , good morning, Yesterday I spent my time on researching grpc related issue and I could not get time to look into NATS, I will be working on this today. Our requirement is to keep the messages for about 24 hours in the queue and purge after that time period. In case if we need to reprocess that one message, we should be able to get to it if we store the sequence number or if we need to investigate something. I read that the data is stored until the message limit reached or it expired ( if we set the TTL). I will update you tomorrow with my finding after fine tuning the properties. The following are the properties I set currently

Stan options


cOpts.PingInterval = 10000;
 cOpts.PingMaxOutstanding = 5;

NATs options

         opts.AllowReconnect = true;
          opts.PingInterval = 10000;
          opts.MaxPingsOut = 5;
          opts.MaxReconnect = Options.ReconnectForever;

I have upgraded Stan to 0.18 alpine image. I think NATS is 2.18.

Thank you for your help.

Thanks Ram

rampratapa avatar Oct 06 '20 11:10 rampratapa

I read that the data is stored until the message limit reached or it expired ( if we set the TTL)

Correct. I just wanted to make sure that you understood how messages were removed in NATS Streaming.

kozlovic avatar Oct 06 '20 14:10 kozlovic

@ColinSullivan1, good evening, I am using the following Nuget package for STAN <PackageReference Include="STAN.Client" Version="0.2.1" />

this pulls, NAT 0.10.0, where can I find latest STAN client that refers NAT 0.10.1 so that I can use for my testing.

Thanks ram

rampratapa avatar Oct 07 '20 23:10 rampratapa

I'll be creating a new release this week, but in the meantime you should be able to update the NATS.Client to 0.10.1 in your project. Using visual studio, I added NATS.Client 0.10.1 as a dependency and the STAN.Client then uses that version it via binding redirects.

e.g. image

On another note, per our conversation yesterday, you might get better performance by reducing the number of tasks allocated to a subscriber.

Set NATS.Client.Options.SubscriberDeliveryTaskCount to a reasonable number, like 3. Otherwise you'll be creating a long running task for every subscriber, which performs excellent with a few subscribers but puts undue stress on the system as you ramp up.

ColinSullivan1 avatar Oct 08 '20 14:10 ColinSullivan1