Rebus.RabbitMq icon indicating copy to clipboard operation
Rebus.RabbitMq copied to clipboard

Unable to handle nodes being put into maintenance mode

Open MrAdam opened this issue 2 years ago • 20 comments

I've run into an issue with Rebus.RabbitMq and Amazon MQ for RabbitMQ, where AWS automatic version upgrades causes an RabbitMQ.Client.Exceptions.AlreadyClosedException exception that I have been unable to handle and recover from.

I have a RabbitMQ cluster with three nodes, which at some point will be put into maintenance mode one by one for upgrading. However, this causes the application to throw an exception if it tries to send a message after the channel has been disposed.

[15.26.08 Rebus.RabbitMq.RabbitMqTransport [Warning] Closed channel detected - consumer will be disposed
[15.26.08 Rebus.Pipeline.Send.SendOutgoingMessageStep [Debug] Sending "HealthCheckResponseMessage" -> "my-service"
[15.26.08 Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware [Error] An unhandled exception has occurred while executing the request.
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text='CONNECTION_FORCED - Node was put into maintenance mode', classId=0, methodId=0   
   at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd)
   at RabbitMQ.Client.Impl.ModelBase.TransmitAndEnqueue(OutgoingCommand& cmd, IRpcContinuation k)
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at Rebus.RabbitMq.RabbitMqTransport.DoSend(IEnumerable`1 outgoingMessages, IModel model, Boolean isExpress)
   at Rebus.RabbitMq.RabbitMqTransport.SendOutgoingMessages(IEnumerable`1 outgoingMessages, ITransactionContext context)
   at Rebus.Transport.AbstractRebusTransport.<>c__DisplayClass2_1.<<Send>b__1>d.MoveNext()
--- End of stack trace from previous location ---

There's nothing wrong with that, but it should be possible to try again, as the other nodes are still alive and well. I have tried injecting a new step in the IPipeline right before the SendOutgoingMessageStep in the hope that I could catch this exception and retry sending, but the Process method of this new step is only reached when messages are sent successfully, and never when encountering the error.

options.Decorate<IPipeline>(context => 
{  
  var pipeline = context.Get<IPipeline>();
  var step = new MaintenanceModeDetectorStep();
  return new PipelineStepInjector(pipeline).OnSend(
    step,
    PipelineRelativePosition.Before,
    typeof(SendOutgoingMessageStep)
  );
});

Am I missing something in regards to handling errors in the ITransport implementation, that would allow me to catch and retry when I get this exception?

MrAdam avatar Sep 05 '23 13:09 MrAdam

Could you try upgrading to Rebus.RabbitMq 8.1.0 (if you're on Rebus 7) or 9.0.0-alpha03 (if you're using Rebus 8 pre) ?

I believe this particular issue happens to have been fixed about 1 hour ago 😅

Let me know how it goes!

mookid8000 avatar Sep 05 '23 14:09 mookid8000

Thank you for the quick response 😄 I just tried upgrading Rebus.RabbitMq to 8.1.0, but I still get the exception.

I am testing this locally by running two nodes of RabbitMq in a cluster configuration, through Docker. Then I have set up an endpoint for testing, which "spams" RabbitMq with messages for 1 minute.

[HttpGet]
public async Task<IActionResult> Spam()
{
    var endTime = DateTime.UtcNow.AddSeconds(30);
    while (DateTime.UtcNow < endTime)
    {
        await _bus.Send(new TestMessage());
        await Task.Delay(TimeSpan.FromMilliseconds(100));
    }
    return Ok();
}

After sending a request to this endpoint, I put the RabbitMq node that the test is connected to into maintenance mode:

docker exec rabbitmq-node1 rabbitmq-upgrade drain

This results in the exception mentioned previously being thrown in the test endpoint, and it doesn't recover in any way as I would have hoped:

[10.32.05 Rebus.Pipeline.Send.SendOutgoingMessageStep [Debug] Sending "TestMessage" -> "other-service"
[10.32.05 Rebus.RabbitMq.RabbitMqTransport [Warning] Closed channel detected - consumer will be disposed
[10.32.05 Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware [Error] An unhandled exception has occurred while executing the request.
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text='CONNECTION_FORCED - Node was put into maintenance mode', classId=0, methodId=0   
   at RabbitMQ.Client.Impl.ModelBase.WaitForConfirms(TimeSpan timeout, Boolean& timedOut)
   at RabbitMQ.Client.Impl.ModelBase.WaitForConfirmsOrDie(TimeSpan timeout)
   at Rebus.RabbitMq.RabbitMqTransport.DoSend(IEnumerable`1 outgoingMessages, IModel model, Boolean isExpress)
   at Rebus.RabbitMq.RabbitMqTransport.SendOutgoingMessages(IEnumerable`1 outgoingMessages, ITransactionContext context)
   at Rebus.Transport.AbstractRebusTransport.<>c__DisplayClass2_1.<<Send>b__1>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.Transport.TransactionContext.InvokeAsync(Func`2 actions)
   at Rebus.Transport.TransactionContext.Complete()
   at Rebus.Bus.RebusBus.InnerSend(IEnumerable`1 destinationAddresses, Message logicalMessage)
   at Rebus.Bus.RebusBus.Send(Object commandMessage, IDictionary`2 optionalHeaders)
   at Test.DevelopmentClientService.Controllers.DevelopmentController.Spam() in C:\Users\Adam\Projects\test\src\Test.DevelopmentClientService\Controllers\DevelopmentController.cs:line 30
   at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.TaskOfIActionResultExecutor.Execute(ActionContext actionContext, IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)     
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Awaited|12_0(ControllerActionInvoker invoker, ValueTask`1 actionResultValueTask)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeInnerFilterAsync>g__Awaited|13_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|20_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Awaited|17_0(ResourceInvoker invoker, Task task, IDisposable scope)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Awaited|17_0(ResourceInvoker invoker, Task task, IDisposable scope)
   at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
   at Microsoft.AspNetCore.Authorization.AuthorizationMiddleware.Invoke(HttpContext context)
   at Swashbuckle.AspNetCore.SwaggerUI.SwaggerUIMiddleware.Invoke(HttpContext httpContext)
   at Swashbuckle.AspNetCore.Swagger.SwaggerMiddleware.Invoke(HttpContext httpContext, ISwaggerProvider swaggerProvider)
   at Microsoft.AspNetCore.Authentication.AuthenticationMiddleware.Invoke(HttpContext context)
   at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddlewareImpl.Invoke(HttpContext context)
[10.32.06 Rebus.Internals.ConnectionManager [Information] Existing connection found to be CLOSED
[10.32.06 Rebus.RabbitMq.RabbitMqTransport [Information] Successfully initialized consumer for "test-service"

MrAdam avatar Sep 06 '23 08:09 MrAdam

The same thing happens with Rebus.RabbitMq version 9.0.0-alpha03

MrAdam avatar Sep 06 '23 08:09 MrAdam

Ok, damn 😅 Could you show me how you configure Rebus?

mookid8000 avatar Sep 06 '23 09:09 mookid8000

Sorry 😛

Here's my complete setup code for Rebus:

serviceCollection.AddRebus(
    (configure, services) =>
    {
        var rabbitMqSettings = services.GetRequiredService<RabbitMqSettings>();

        var sslSettings = new SslSettings(
            rabbitMqSettings.ConnectionString.StartsWith(
                "amqps://",
                StringComparison.OrdinalIgnoreCase
            ),
            new Uri(rabbitMqSettings.ConnectionString).Host,
            version: SslProtocols.Tls12
        );

        return configure
            .Logging(logging => logging.Serilog())
            .Transport(
                transport =>
                    transport
                        .UseRabbitMq(
                            rabbitMqSettings.ConnectionString,
                            rabbitMqSettings.InputQueueName
                        )
                        .Ssl(sslSettings)
                        .ClientConnectionName(
                            Assembly.GetExecutingAssembly().GetName().Name
                        )
                        .InputQueueOptions(
                            queue => queue.SetQueueTTL(rabbitMqSettings.InputQueueTtl)
                        )
            )
            .Serialization(
                serialization =>
                    serialization.UseSystemTextJson(
                        new JsonSerializerOptions().ConfigureForNodaTime(
                            DateTimeZoneProviders.Tzdb
                        )
                    )
            )
            .Routing(
                routing =>
                    routing
                        .TypeBased()
                        .Map<TestMessage>(
                            rabbitMqSettings.OtherServiceQueueName
                        )
            )
            .Options(options =>
            {
                options.SetNumberOfWorkers(1);
                options.SetMaxParallelism(1);
                options.HandleException<MessageCouldNotBeDispatchedToAnyHandlersException>(
                    (_, context) =>
                    {
                        Log.Information(
                            "Message type {@Type} with headers {@Headers} could not be dispatched to any handlers",
                            context.Message.GetMessageType(),
                            context.Headers
                        );
                        return Task.CompletedTask;
                    }
                );
            });
    }
)

And if you'd like an easy way to run a RabbitMQ cluster locally for testing, this is the docker-compose.yml I use:

version: "3"
name: rabbitmq-cluster

services:
  # docker exec rabbitmq-node1 rabbitmq-upgrade drain
  # docker exec rabbitmq-node1 rabbitmq-upgrade revive
  rabbitmq-node1:
    image: rabbitmq:3-management
    container_name: rabbitmq-node1
    hostname: rabbitmq-node1
    environment:
      - RABBITMQ_DEFAULT_USER=rabbitmq
      - RABBITMQ_DEFAULT_PASS=rabbitmq
      - RABBITMQ_DEFAULT_VHOST=/
      - RABBITMQ_ERLANG_COOKIE=123456
    labels:
      - "traefik.enable=true"
      - "traefik.tcp.routers.rabbitmq.rule=HostSNI(`*`)"
      - "traefik.tcp.routers.rabbitmq.entrypoints=rabbitmq"
      - "traefik.tcp.routers.rabbitmq.service=rabbitmq"
      - "traefik.tcp.services.rabbitmq.loadbalancer.server.port=5672"
      - "traefik.tcp.routers.rabbitmq-management.rule=HostSNI(`*`)"
      - "traefik.tcp.routers.rabbitmq-management.entrypoints=rabbitmq-management"
      - "traefik.tcp.routers.rabbitmq-management.service=rabbitmq-management"
      - "traefik.tcp.services.rabbitmq-management.loadbalancer.server.port=15672"

  # docker exec rabbitmq-node2 rabbitmq-upgrade drain
  # docker exec rabbitmq-node2 rabbitmq-upgrade revive
  rabbitmq-node2:
    image: rabbitmq:3-management
    container_name: rabbitmq-node2
    hostname: rabbitmq-node2
    environment:
      - RABBITMQ_ERLANG_COOKIE=123456
    command: >
      bash -c "
        /usr/local/bin/docker-entrypoint.sh rabbitmq-server -detached
        sleep 5
        rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit\@`env hostname`.pid
        rabbitmqctl stop_app
        rabbitmqctl join_cluster rabbit@rabbitmq-node1
        rabbitmqctl start_app
        tail -f /var/log/rabbitmq/*.log
      "
    labels:
      - "traefik.enable=true"
      - "traefik.tcp.routers.rabbitmq.rule=HostSNI(`*`)"
      - "traefik.tcp.routers.rabbitmq.entrypoints=rabbitmq"
      - "traefik.tcp.routers.rabbitmq.service=rabbitmq"
      - "traefik.tcp.services.rabbitmq.loadbalancer.server.port=5672"
      - "traefik.tcp.routers.rabbitmq-management.rule=HostSNI(`*`)"
      - "traefik.tcp.routers.rabbitmq-management.entrypoints=rabbitmq-management"
      - "traefik.tcp.routers.rabbitmq-management.service=rabbitmq-management"
      - "traefik.tcp.services.rabbitmq-management.loadbalancer.server.port=15672"
    depends_on:
      - rabbitmq-node1

  load-balancer:
    image: traefik:3.0
    container_name: load-balancer
    command:
      - "--api.insecure=true"
      - "--providers.docker=true"
      - "--entrypoints.rabbitmq.address=:5672"
      - "--entrypoints.rabbitmq-management.address=:15672"
    ports:
      - "15672:15672"
      - "5672:5672"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - rabbitmq-node1
      - rabbitmq-node2

MrAdam avatar Sep 06 '23 10:09 MrAdam

We had a similar issue when we had our maintenance window in AWS for RabbitMQ. To be able to reconnect successfully to the cluster after one node was taken down we implemented a custom connection factory (RabbitMQ.Client.IConnectionFactory) where we used Polly to retry the connection until the connection could be successfully established again. A custom connection factory can be provided by calling CustomizeConnectionFactory on RabbitMqOptionsBuilder.

simongullberg avatar Sep 06 '23 13:09 simongullberg

@simongullberg, the default RabbitMQ.Client.ConnectionFactory has AutomaticRecoveryEnabled set to true by default, and uses a RabbitMQ.Client.Framing.Impl.AutorecoveringConnection - I would have assumed that this would catch these kind of errors, and reestablish the connection, but it seems like this doesn't kick in if you send a message before it finds out that the connection is lost.

@mookid8000 This seems to me to be something that would be great to have integrated into Rebus.RabbitMq, maybe @simongullberg could open source their implementation, and we could integrate it into the library?

MrAdam avatar Sep 07 '23 06:09 MrAdam

@simongullberg seems like your solution won't work in this case. I just tried implementing a custom IConnectionFactory and using it, but it only gets triggered when a new connection is being made. The call to Send on the IBus still fails, as it tries to send on a connection that is broken.

MrAdam avatar Sep 07 '23 07:09 MrAdam

@MrAdam Rebus initialized its RabbitMQ ConnectionFactory like this: https://github.com/rebus-org/Rebus.RabbitMq/blob/master/Rebus.RabbitMq/Internals/ConnectionManager.cs#L139-L148

What would you suggest I do to be more able to overcome connection issues?

mookid8000 avatar Sep 07 '23 09:09 mookid8000

@mookid8000 not really sure what the best approach is here. I did some more digging, and seems like Rebus.RabbitMq.RabbitMqTransport is calling RabbitMQ.Client.Impl.AutorecoveringModel.BasicPublish(...) before the connection has been recovered. Debugging into the application, I can see that the error occurs when model.BasicPublish is called while model.IsClosed=true.

As noted in the RabbitMQ .NET Client API Guide (https://www.rabbitmq.com/dotnet-api-guide.html#publishers):

Messages that are published using IModel.BasicPublish when connection is down will be lost. The client does not enqueue them for delivery after connection has recovered. To ensure that published messages reach RabbitMQ applications need to use Publisher Confirms and account for connection failures.

Which makes sense. But what I feel like I'm missing is some way to hook into Rebus.RabbitMq.RabbitMqTransport.DoSend(...) to allow me to add some custom logic for retrying, if model.BasicPublish fails.

MrAdam avatar Sep 07 '23 11:09 MrAdam

The only way I see to solve this at the moment, is to wrap all calls to IBus in a try/catch, one way or another, and catch (AlreadyClosedException exception) when (exception.ShutdownReason.ReplyCode == 320). This could be through a wrapper-class which implements IBus and replaces the original IBus registration in the service collection, but that would require reimplementing quite a lot of methods.

I think that having a way to handle exceptions inside RabbitMqTransport would be prefered. Unless there is some better place in the hierarchy to place this?

MrAdam avatar Sep 11 '23 11:09 MrAdam

Which makes sense. But what I feel like I'm missing is some way to hook into Rebus.RabbitMq.RabbitMqTransport.DoSend(...) to allow me to add some custom logic for retrying, if model.BasicPublish fails.

I don't think it should be necessary to hook into the transport's internals, its error handling should be built in. And I actually thought that Rebus' RabbitMQ transport used publisher confirms for that, but apparently something is missing (or not properly re-initialized when it has gone bad).

mookid8000 avatar Sep 12 '23 05:09 mookid8000

@MrAdam yes, you are right. This is another use case and we experienced it just a couple of days ago actually. We already have a bus wrapper implemented so my idea was just to wrap the IBus.Send() in a try/catch like you described. Turned out that this did not work for us because we use Rebus.TransactionScope package and scope.EnlistRebus() which means that the messages are sent later when we call scope.Complete().

@mookid8000 do you have any idea how we can handle transient exceptions that is thrown by the underlying RabbitMQ.Client library when using scope.EnlistRebus() and retry the send operation?

simongullberg avatar Dec 14 '23 13:12 simongullberg

@mookid8000 do you have any ideas about this?

simongullberg avatar Jan 30 '24 12:01 simongullberg

@simongullberg sorry for being so slow to reply 😅

I am just reading up on this issue again – if I understand it correctly, is the problem then that

await bus.Send(something);

fails with a RabbitMQ "the-model-is-closed" exception, and then if you were to wait a second and retry

await bus.Send(something);

then maybe it would succeed?

If that's the case, then my opinion is that the retry should be performed by the caller and not internally in the transport.

E.g. if your await bus.Send(something) is in a Rebus handler, it will automatically be retried a couple of times. Or if you were in a web app, you would know best which Polly strategy would be the best to be able to await bus.Send(something) and overcome whichever problems you anticipate.

mookid8000 avatar Apr 10 '24 08:04 mookid8000

@mookid8000, no worries :)

I think what you say makes perfect sense but since we are using a TransactionScope with EnlistRebus() It think the actual sending will happen somewhere else?

          using (var scope = TransactionUtils.CreateTransactionScope())
          {
                scope.EnlistRebus();

                await _bus.Send(something) // The actual send will not take place here.

                scope.Complete();
          }

Like when the .NET TransactionScope is completed the underlying RebusTransactionScope is completed and the messages are sent to RabbitMQ, so I guess the actual retrying needs to take place somewhere in here: https://github.com/rebus-org/Rebus.TransactionScopes/blob/master/Rebus.TransactionScopes/TransactionScopes/TransactionScopeExtensions.cs#L46

simongullberg avatar May 14 '24 09:05 simongullberg

Yeah, in the code you posted the messages will be sent when you call scope.Complete();

If you want to retry sending with the example you posted, it would require a new transaction scope, because there's pretty high risk that the transaction has become bad from failing (e.g. by caching a failed IModel).

mookid8000 avatar May 20 '24 07:05 mookid8000

I tried wrapping the RebusTransactionScope.Complete() method in a try-catch block and in the catch, sleep for a while and then call RebusTransactionScope.Complete() again. That did not work unfortunately, because the TransactionContext on the RebusTransactionScope is set to completed even though completing it the first time failed. When calling RebusTransactionScope.Complete() once again in the catch, the TransactionContext throws a "ThrowCompletedException".

@mookid8000, is it possible in some way to create a new RebusTransactionScope with a new TransactionContext and get the outgoing messages from the previous TransactionContext?

simongullberg avatar May 31 '24 09:05 simongullberg

@simongullberg Yeah unfortunately, the transaction scope changes state as part of the completion attempt, and so it would not be able to guarantee that it would be able to repeat the same actions, so as you've correctly experienced yourself, completing/aborting the scope is a one-off 😅

And no, it's not readily possible. The reason is that it's slightly more involved than simply getting the outgoing messages from the transaction context, because it's possible for the transport implementation and the pipeline steps and other Rebus extensions to have hooked up callbacks that do stuff to the context's OnCommit/OnRollBack/OnAck/OnNack events.

mookid8000 avatar May 31 '24 09:05 mookid8000

In release 9.4.0 it looks like a simple retry strategy has been implemented so thanks for that :)

In our API's where we run OneWayClients we have implemented a simple wrapper (with Scoped lifetime) for IBus where we buffer the outgoing messages and then send them after the .NET transaction completes and if an exception occurs we have a simple Polly policy implemented that will retry to send the outgoing messages. However, this does not help us when we are in the context of a message handler so the fix in 9.4.0 is very much appreciated!

simongullberg avatar Oct 08 '24 12:10 simongullberg