GreenPipes
GreenPipes copied to clipboard
Race conditions: eviction of used send endpoint from GreenCache leads to errors
I'd love to come with code but it is a bit hard since it is about race conditions.
Here's what is happening in our Production:
- We use MassTransit with RabbitMQ transport;
- We override
SendEndpointCacheDefaults.MaxAge
to be 30 minutes, which is significantly shorter that the default (24 hours). There are reasons for that as we have on-demand service process which is going down when inactive, and we want to dispose inactive channels sooner rather than later. - Internally, MT is using
GreenPipes.GreenCache
to cache send endpoints. The above cache as per my understanding of the code has the eviction policy based solely on time, and the time of the bucket is immutable and is only assigned when the bucket is created.
This leads once in a while to the following unpleasant scenario:
- The send endpoint is in use
- The same endpoint is evicted from cache and being disposed in another thread
- This ends up with a variety of exceptions, for example:
System.Threading.Tasks.TaskCanceledException A task was canceled ... at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter
1.<ConfigureTopology>d__6.MoveNext()`
When I think of the best resolution to that, in my opinion the timestamp of the GreenCache's bucket should be updated when the bucket is used, i.e. switch from "created" to "last used" time. This would prevent the used objects from being evicted.
Are you retaining an ISendEndpoint
reference beyond the immediate use of it in your code? The reason I ask, is that is a big no-no. If you are infrequently calling Send
on ISendEndpoint
, you should ask for it from the ISendEndpointProvider
when you need it, using the endpoint address. That way, the cache can behave properly. Every time you call Send
on a cached endpoint, it is moved to the current bucket (if it isn't already in it), preventing it from being disposed. However, if you get the send endpoint, and hang on to it, and then call Send
after the bucket is ready to be disposed, you will be using a likely disposed send endpoint. Does that make sense?
No we do not hold references to send endpoints. We use the MessageRequestClient
and consumers, the rest is happening inside the MT.
Example:
public async Task Consume(ConsumeContext<ICountRequest> context)
{
try
{
Log.Trace($"Handling MassTransit ICountRequest: Consumer ID: {_id}");
var count = await _serviceImplementation.Count();
await context.RespondAsync<ICountResponse>(new { Count = count }).ConfigureAwait(false);
}
catch (Exception e)
{
Log.Error(e, "Error Handling ICountRequest");
throw;
}
}
Hi @phatboyg, just to clarify, we make use of the Request / Response pattern.
We use a microservice architecture and sometimes have a huge spike in transient processes. Whenever these instances send a request to our long-lived microservices, GreenPipes end up creating and caching a RabbitMQ channel when responding to the request (in the long-lived microservices).
This consequently is leading to massive channel leaks that are causing our RabbitMQ cluster to crash. After a couple of weeks, we have channel counts in the multiple hundreds of thousands of channels in RabbitMQ. Most of these channels are stale but remain cached by Green Pipes.
In an effort to reduce this effect, we reduced the cache limits to force GP to evict the stale channels sooner. But now we are experiencing active channels also getting evicted.
I guess my simple answer, why are you creating a new endpoint/response queue for each request? You should be reusing existing response queues, it takes a lot of cpu/memory to create/destroy those constantly. In any high volume site, you should be sharing response queues on web api services.
I have uploaded a sample simulating the issue here.
So it makes sense, every time a process restarts, it creates a new unique queue for that process. That queue is clearly used to receive the response messages.
Just to explain our architecture. We develop Airport Kiosks. Each Kiosk has a worker process that executes a workflow. When the Kiosk restarts our Micro Service Environment will restart the worker process. In other words, it would be perfect if we can create a named queue that receives the response messages for each kiosk. Ex. kiosk-1-response, kiosk-2-response, etc.
But how can we create a RequestClient and force it to use a named queue instead of the auto-generated unique process queue? I think this is where we are missing something.
Here are the code snippets from the example for the Requester and Responder. Note, the Responder is a long-lived Micro Service. The Requester simulates short-lived worker processes.
Responder:
static void Main()
{
Console.WriteLine("Starting Responder...");
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri("rabbitmq://localhost"), h => {});
sbc.ReceiveEndpoint(host, "channel-test", ep =>
{
ep.Handler<RequestMessage>(context => context.RespondAsync(
new ResponseMessage { Text = $"Hallo {context.Message.Text}" }
));
});
});
bus.Start();
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
bus.Stop();
}
And here is the Requester:
static void Main()
{
Console.WriteLine("Starting Requester...");
Console.WriteLine("Press ctrl-C to exit...");
while (true)
{
SimulateProcessRestart().GetAwaiter().GetResult();
}
}
// This Simulates a new Short Lived Process
static async Task SimulateProcessRestart()
{
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
sbc.Host(new Uri("rabbitmq://localhost"), h => {});
});
await bus.StartAsync();
//Is there a way to create a RequestClient and bind it to a named response queue?
var client = bus.CreateRequestClient<RequestMessage>(new Uri("rabbitmq://localhost/channel-test"));
var response = await client.GetResponse<ResponseMessage>(new RequestMessage {Text = "World"});
Console.WriteLine($"Received Response: {response.Message.Text}");
await bus.StopAsync();
}
P.S. And just to be clear, this is just a simplified example. In production, each short-lived process can live for hours. We reuse the bus for all message transactions in that process. So just to be clear, we don't create a bus per request, hundreds of requests are typically sent per process instance. But we have multiple hundreds of Kiosks, and so after a week or two, we end up with channel counts in the hundreds of thousands on our RabbitMQ Server.
Update: @dmitrynovik implemented the following change, and it seems to have solved the problem:
static async Task SimulateProcessRestart()
{
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
sbc.Host(new Uri("rabbitmq://localhost"), h => {});
// Override bus endpoint queue name (create unique logical name per process name+machine)
sbc.OverrideDefaultBusEndpointQueueName($"{typeof(Program).FullName}-{Environment.MachineName}");
});
await bus.StartAsync();
}
Are we on the right track by using sbc.OverrideDefaultBusEndpointQueueName
?
Yeah, that would reduce the churn and use a consistent name for each machine, instead of creating a new unique one for every process invocation. Just need to make sure the process isn't running multiple times on the same box, since both instances would end up using the same queue.
Excellent. Yes, we have mechanisms in place to prevent multiple instances running for the same Kiosk. This solves our issue. Thanks for the help.
I need to change it so that temporary response endpoints are not cached. And also don’t setup topology.
I need to change it so that temporary response endpoints are not cached. And also don’t setup topology.
Hi @phatboyg , Could you please elaborate more on the forthcoming change?
The reason I'm asking is that even after having the predefined RabbitMQ queue names, our channel leak problem has not been solved.
If you have a set list of queue names, and you are still seeing channel leaks, I don't think it's MassTransit. IF you have dotTrace, and can do a memory profile, you should be able to see what is holding on to references of those send endpoints.