GreenPipes icon indicating copy to clipboard operation
GreenPipes copied to clipboard

Race conditions: eviction of used send endpoint from GreenCache leads to errors

Open dmitrynovik opened this issue 5 years ago • 11 comments

I'd love to come with code but it is a bit hard since it is about race conditions.

Here's what is happening in our Production:

  • We use MassTransit with RabbitMQ transport;
  • We override SendEndpointCacheDefaults.MaxAge to be 30 minutes, which is significantly shorter that the default (24 hours). There are reasons for that as we have on-demand service process which is going down when inactive, and we want to dispose inactive channels sooner rather than later.
  • Internally, MT is using GreenPipes.GreenCache to cache send endpoints. The above cache as per my understanding of the code has the eviction policy based solely on time, and the time of the bucket is immutable and is only assigned when the bucket is created.

This leads once in a while to the following unpleasant scenario:

  • The send endpoint is in use
  • The same endpoint is evicted from cache and being disposed in another thread
  • This ends up with a variety of exceptions, for example: System.Threading.Tasks.TaskCanceledException A task was canceled ... at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter1.<ConfigureTopology>d__6.MoveNext()`

When I think of the best resolution to that, in my opinion the timestamp of the GreenCache's bucket should be updated when the bucket is used, i.e. switch from "created" to "last used" time. This would prevent the used objects from being evicted.

dmitrynovik avatar May 01 '19 01:05 dmitrynovik

Are you retaining an ISendEndpoint reference beyond the immediate use of it in your code? The reason I ask, is that is a big no-no. If you are infrequently calling Send on ISendEndpoint, you should ask for it from the ISendEndpointProvider when you need it, using the endpoint address. That way, the cache can behave properly. Every time you call Send on a cached endpoint, it is moved to the current bucket (if it isn't already in it), preventing it from being disposed. However, if you get the send endpoint, and hang on to it, and then call Send after the bucket is ready to be disposed, you will be using a likely disposed send endpoint. Does that make sense?

phatboyg avatar May 01 '19 13:05 phatboyg

No we do not hold references to send endpoints. We use the MessageRequestClient and consumers, the rest is happening inside the MT.

Example:

        public async Task Consume(ConsumeContext<ICountRequest> context)
        {
            try
            {
                Log.Trace($"Handling MassTransit ICountRequest: Consumer ID: {_id}");
                var count = await _serviceImplementation.Count();
                await context.RespondAsync<ICountResponse>(new { Count = count }).ConfigureAwait(false);
            }
            catch (Exception e)
            {
                Log.Error(e, "Error Handling ICountRequest");
                throw;
            }
        }

dmitrynovik avatar May 01 '19 22:05 dmitrynovik

Hi @phatboyg, just to clarify, we make use of the Request / Response pattern.

We use a microservice architecture and sometimes have a huge spike in transient processes. Whenever these instances send a request to our long-lived microservices, GreenPipes end up creating and caching a RabbitMQ channel when responding to the request (in the long-lived microservices).

This consequently is leading to massive channel leaks that are causing our RabbitMQ cluster to crash. After a couple of weeks, we have channel counts in the multiple hundreds of thousands of channels in RabbitMQ. Most of these channels are stale but remain cached by Green Pipes.

In an effort to reduce this effect, we reduced the cache limits to force GP to evict the stale channels sooner. But now we are experiencing active channels also getting evicted.

michalsteyn avatar May 02 '19 21:05 michalsteyn

I guess my simple answer, why are you creating a new endpoint/response queue for each request? You should be reusing existing response queues, it takes a lot of cpu/memory to create/destroy those constantly. In any high volume site, you should be sharing response queues on web api services.

phatboyg avatar May 02 '19 21:05 phatboyg

I have uploaded a sample simulating the issue here.

So it makes sense, every time a process restarts, it creates a new unique queue for that process. That queue is clearly used to receive the response messages.

Just to explain our architecture. We develop Airport Kiosks. Each Kiosk has a worker process that executes a workflow. When the Kiosk restarts our Micro Service Environment will restart the worker process. In other words, it would be perfect if we can create a named queue that receives the response messages for each kiosk. Ex. kiosk-1-response, kiosk-2-response, etc.

But how can we create a RequestClient and force it to use a named queue instead of the auto-generated unique process queue? I think this is where we are missing something.

Here are the code snippets from the example for the Requester and Responder. Note, the Responder is a long-lived Micro Service. The Requester simulates short-lived worker processes.

Responder:

static void Main()
{
    Console.WriteLine("Starting Responder...");

    var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
    {
        var host = sbc.Host(new Uri("rabbitmq://localhost"), h => {});

        sbc.ReceiveEndpoint(host, "channel-test", ep =>
        {
            ep.Handler<RequestMessage>(context => context.RespondAsync(
                new ResponseMessage { Text = $"Hallo {context.Message.Text}" }
            ));
        });
    });

    bus.Start();
    Console.WriteLine("Press any key to exit...");
    Console.ReadKey();
    bus.Stop();
}

And here is the Requester:

static void Main()
{
    Console.WriteLine("Starting Requester...");
    Console.WriteLine("Press ctrl-C to exit...");
    while (true)
    {
        SimulateProcessRestart().GetAwaiter().GetResult(); 
    }
}

// This Simulates a new Short Lived Process
static async Task SimulateProcessRestart()
{
    var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
    {
        sbc.Host(new Uri("rabbitmq://localhost"), h => {});
    });
    await bus.StartAsync();

    //Is there a way to create a RequestClient and bind it to a named response queue?
    var client = bus.CreateRequestClient<RequestMessage>(new Uri("rabbitmq://localhost/channel-test"));
    var response = await client.GetResponse<ResponseMessage>(new RequestMessage {Text = "World"});

    Console.WriteLine($"Received Response: {response.Message.Text}");
    await bus.StopAsync();
}

P.S. And just to be clear, this is just a simplified example. In production, each short-lived process can live for hours. We reuse the bus for all message transactions in that process. So just to be clear, we don't create a bus per request, hundreds of requests are typically sent per process instance. But we have multiple hundreds of Kiosks, and so after a week or two, we end up with channel counts in the hundreds of thousands on our RabbitMQ Server.

michalsteyn avatar May 08 '19 00:05 michalsteyn

Update: @dmitrynovik implemented the following change, and it seems to have solved the problem:

static async Task SimulateProcessRestart()
{
    var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
    {
        sbc.Host(new Uri("rabbitmq://localhost"), h => {});
        
        // Override bus endpoint queue name (create unique logical name per process name+machine)
        sbc.OverrideDefaultBusEndpointQueueName($"{typeof(Program).FullName}-{Environment.MachineName}");
    });
    await bus.StartAsync();
}

Are we on the right track by using sbc.OverrideDefaultBusEndpointQueueName?

michalsteyn avatar May 08 '19 19:05 michalsteyn

Yeah, that would reduce the churn and use a consistent name for each machine, instead of creating a new unique one for every process invocation. Just need to make sure the process isn't running multiple times on the same box, since both instances would end up using the same queue.

phatboyg avatar May 10 '19 12:05 phatboyg

Excellent. Yes, we have mechanisms in place to prevent multiple instances running for the same Kiosk. This solves our issue. Thanks for the help.

michalsteyn avatar May 10 '19 23:05 michalsteyn

I need to change it so that temporary response endpoints are not cached. And also don’t setup topology.

phatboyg avatar Feb 09 '20 22:02 phatboyg

I need to change it so that temporary response endpoints are not cached. And also don’t setup topology.

Hi @phatboyg , Could you please elaborate more on the forthcoming change?

The reason I'm asking is that even after having the predefined RabbitMQ queue names, our channel leak problem has not been solved.

dmitrynovik avatar Feb 10 '20 04:02 dmitrynovik

If you have a set list of queue names, and you are still seeing channel leaks, I don't think it's MassTransit. IF you have dotTrace, and can do a memory profile, you should be able to see what is holding on to references of those send endpoints.

phatboyg avatar Feb 10 '20 15:02 phatboyg