Rebus
Rebus copied to clipboard
How to clean up subscriptions
I just wonder how subscriptions are actually cleaned. Lets say a pod crashs with a stackoverflow exception. Would you not retain the subscription forever?
I have thought about this for a while now and I still do not understand it properly. For me, one solution would be to implement something like an heartbeat where a subscriber confirms that it is still alive. If there is no activity for a while the subscribe can be removed and should be removed.
I could integrate this into my own MongoDB transport (see #1045) but it would not be very modular.
So a solution could be to introduce a few new interfaces:
public interface ITransportCleanup
{
void DeleteQueue(string name);
}
public interface ITransientSubscriptionStorage : ISubscriptionStorage
{
/// <summary>
/// Registers the given <paramref name="subscriberAddress"/> as a subscriber of the given topic
/// as a transient subscription.
/// </summary>
/// <remarks>Will be called multiple times, as long as the subscription is alive.</remarks>
Task RegisterTransientSubscriber(string topic, string subscriberAddress, DateTime now);
/// <summary>
/// Get all subscribers that have not been marked as alive after the specified timestamp.
/// </summary>
Task<string[]> GetInactiveSubscriberAddresses(DateTime olderThan);
}
public interface IBus
{
/// <summary>
/// Subscribes to the specified message and keeps the subscription alive as long as the process is running.
/// </summary>
Task Subscribe<TEvent>(bool transient);
}
I'm not following you quite yet here. And I don't know anything about your uses cases and such, so I might do some assumptions that's not quite right.
Why would want to you remove a subscription if a service crashes?
I see you want to implement a subscription as transient
, I guess you're not interested in events that happens since a certain time.
The risk I see here is that a message is sent just before the service is up 'n running, the message would be discarded (or what to call it since there is no subscription), even tho it might be valid. But that's up to you to decide ofc 🙂
You could implement TTL. But if you don't want the sender to decide whether the message is old, you could just discard it in the handler (I guess that's what you're doing right now).
So a solution could be to introduce a few new interfaces:
Where would these interfaces be implemented? In the service that crashes? How would they clean the subscription if they crash?
For me, one solution would be to implement something like an heartbeat where a subscriber confirms that it is still alive. If there is no activity for a while the subscribe can be removed and should be removed.
Or maby a monitoring service that handles the deletion for you, if you really need deletions.
I am basically talking about pubsub here.
If you use pubsub without direct support from the transport you need a unique address for each host. So you lets say you have 3 nodes, then you could just name them "pubsub-0", "pubsub-1", "pubsub-2".
But in many hosting platforms you do not get a stable host name, for example kubernetes deployments. So your address would be "pubsub-[RANDOM-HOSTNAME]".
With the previous setup of 3 nodes your subscription count could grow with every crash or node eviction, which can happen. So after a year, your subscription could easily grow to hundreds of subscribers. But these subscribers do not exist anymore, so you are basically sending messages to queues without any listeners.
Sooner or later, the queues will be cleaned automatically, but your still causing unexpected costs and the publish operation will be very slow.
I'm no expert by no means, and @mookid8000 probably gets this right away, but I'm not quite sure about the real problem:
Is the problem rather about you creating tons of new sub-addresses instead of reusing a fixed one for every service?
Clearification: You got multiple instances of a certain service? All of those got their own unique sub-address? You get a dynamically unique sub-address on start of a service instance? You are using MongoDb as transport?
You got multiple instances of a certain service? Yes
All of those got their own unique sub-address? Yes, to make pubsub possible when the transport does not support it (not sure about other transports).
You get a dynamically unique sub-address on start of a service instance? Yes, like a combination of a prefix and the host name. The host name is randomized (e.g. in kubernetes).
You are using MongoDb as transport? I am using a transport that does not support pubsub out of the box.
I have used pubsub with Redis, Orleans and RabbitMQ so far, but not with rebus. I have just played around, so my understanding could be wrong.
What I understand or believe to understand is...
- When multiple instances share the same address one of them gets a message. Like in kafka with group ids. 2
- When you want to send a message to multiple instances, each of them needs a unique address. It is very similar to temporary queues in rabbit-mq. https://www.rabbitmq.com/tutorials/tutorial-three-python.html
- These subscriptions get never deleted or get only deleted if the transport has a way to detect inactive subscriptions.
Examples
Rabbit MQ (Supports PubSub)
In RabbitMQ transport, the subscription is managed by the transport and creates a binding from a topic queue to the input queue of the service instance. But when the instance crashes the queue stays there forever. I guess the queue is very lightweight in Rabbit MQ, so inactive queues are not a big deal, but because of the binding, a queue from a crashed instance still receives messages.
Other Transports
When the transport does not support pubsub the subscriptions are stored in the database. So the old queues are not removed and receive messages from the publisher instance: https://github.com/rebus-org/Rebus/blob/master/Rebus/Bus/RebusBus.cs#L261
Hi @SebastianStehle , sorry for being so slow to chime in here 🙂 could you maybe say some more about your use case?
I believe you've understood correctly how Rebus works, specifically around the fact that subscriptions are meant to be persistent, just like the input queues.
The fact that RabbitMQ supports auto-delete queues (which can actually be used through Rebus) is to be seen as an odd, curious thing pertaining to RabbitMQ only (and Azure Service Bus, funnily enough), and I really don't recommend using it. I've mostly seen people use RabbitMQ's auto-delete queues in cases where clients come and go, e.g. between a WPF-based desktop app and a server, but that's a scenario where HTTP (possibly combined with websockets) would have been a much much better technology fit.
So I am curious to hear more about what you're trying to do here. 🙂
I have normal pubsub requirements, for me it would be cache invalidation. I have a few items that would be cached locally for better performance and then I would broadcast invalidation messages to all nodes.
But I guess the signalr backbone would have the same issues. The problem in general are deployments with "unstable" host names. Either because of auto scaling or because the host names have a random part like in kubernetes.
Yeah ok, I can see how it would be kinda neat to use Rebus to invalidate in-mem caches in containers.
Unfortunately, as you've discovered, Rebus adds a little bit of friction there, because it's optimized for persistent queues and not the ephemeral type of communication you're after.
Could you maybe look at using another kind of notification technology with built-in TTL for that? E.g. Redis, etcd, etc.?
I just do not understand the general use case of PubSub then. Which deployment today has stable identifiers?
For me it looks like pubsub makes only sense, when my deployment never changes, but then I can just hardcode the logic into my code.
But I think it can be solved releatively easily with the transient subscribers concept.
I just do not understand the general use case of PubSub then. Which deployment today has stable identifiers?
You should think of a queue like you think of a URL: It's an endpoint for some kind of thing that can do stuff for you, or it can can do stuff by registering itself via pub/sub as a listener for events (similar to webhooks).
Like URLs you generally shouldn't care too much about whether there's one instance or multiple instances competing for messages from a queue – varying the number of instances is just something you do to adapt to the amount of work you have to do.
Durable messaging generally makes a lot of sense when messages are important to your domain, e.g. events that get published when orders are received, payments are processed, stuff like that.
Durable messaging makes much less sense when messages can lose their meaning. In your case, messages can be targeted at one specific pod, but that instance can go away, and in that case the entire queue is moot. That to me sounds like a use case for a more ephemeral type of messaging, since you don't need the durability anyway.
For me it looks like pubsub makes only sense, when my deployment never changes, but then I can just hardcode the logic into my code.
This is spot on, actually 🙂 Again, queues (at least the way Rebus treats them) should be thought of as addresses that come and go as often as your URLs come and go: Not that often! That part of the deployment should be fairly stable, and removing a queue should be done with the same careful consideration as you would when removing a URL from your deployment.
I understand the concept of durable messages and I have used it a lot, we have a very complex architecture here, based on kafka and well defined topics. In this case the topology is more less and is defined as part of the architecture.
In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic. And you have no idea who is "all". So the topics are usually durable and well defined but not the subscribers.
I still don't understand how your signalr backplane works. You have two examples on the repository:
- Rabbit MQ, here auto delete is set to true and durable is set to false. I guess to solve the problem hat subscribers come and go. But this is a little bit agains the principle of "Smart endpoints and dumb pipes" that is the base of this library.
- SignalR: here the problem exists. After a year with daily deployments you have have thousand of dead subscribers that receive messages for no reason.
I really think this feature should be fixed or removed.
In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic (...)
But with Rebus, in this case "a subscriber" is a logical concept which may be deployed in multiple instances.
Rebus is optimized for logical subscribers that do NOT come and go as often.
With Kafka, "a subscriber" would map to a "consumer group", and I bet you also don't have those coming and coming that often – if you did, it would quickly pollute whatever mechanism you're using to track offsets with consumer group keys.
Regarding the Rebus SignalR backplane, I have to admit that I haven't used it myself or contributed to developing it. If this is actually a problem that arises when using that, then maybe you could raise this as an issue here? 👉 https://github.com/rebus-org/Rebus.SignalR/issues
Yes, I understand, it is just so confusing. I think your model is very similar to Google PubSub, where you create subscriptions manually. Then consumers subscribe to the subscriptions and you can either have one or many consumers. Therefore you can model different topologies, e.g. a model where you distribute a task to several worker groups, each of them responsible for one specific task or you model a topology where each subscriber gets a copy like for signalR.
But you can define that a subscription should expire automatically if there is no activity.
But what do you think about extending the feature? I think rebus is good, the interface for custom transports is lean and small. e.g. it is super difficult with MassTransit, but I really need this pubsub thing.
I'm sorry, but I am not convinced that this would be a good thing to add to Rebus. I can definitely see how it would solve your problem, but I foresee that it would be hard to implement (maybe even impossible) across all transports.
Moreover – and this is actually the main reason why I am reluctant to add the feature – I think it signals that "transient subscriptions" is a thing in Rebus, which I don't believe it is, as everything in Rebus revolves around queues/topics/bindings (a.k.a. subscriptions) being persistent in nature.
Wouldn't it be possible for you to implement something simple that would solve your specific use case? E.g. something that periodically checks your subscriptions and somehow detects that they're no longer used?
Moreover – and this is actually the main reason why I am reluctant to add the feature – I think it signals that "transient subscriptions" is a thing in Rebus, which I don't believe it is, as everything in Rebus revolves around queues/topics/bindings (a.k.a. subscriptions) being persistent in nature.
You can just call it "expiration time" on normal subscriptions ;)
Wouldn't it be possible for you to implement something simple that would solve your specific use case? E.g. something that periodically checks your subscriptions and somehow detects that they're no longer used?
Probably, but I am building Open Source applications and would like to support multiple transports. So if I have to hack rebus I can also implement it myself. I have already done it for some basic scenarios: https://github.com/Squidex/messaging/tree/main/Squidex.Messaging/Implementation
But If I can just rely on another project and contribute to this it would make more sense for me.
@SebastianStehle I think redis client caching is a better fit here. It handles the exact type of ephemeral pub-sub you are trying to build (with a bunch of nice options specific to building and managing local caches) https://redis.io/docs/manual/client-side-caching/
PS. I found this post looking to do the the exact same thing as you - local caching with invalidation messages over Rebus. But now realize this is not what Rebus is designed for (or even most of the underlying transports - they all would have a problem cleaning up old subscribers, even when directly using their APIs)
Yes, I have already build my own abstaction: https://github.com/Squidex/libs/tree/main/messaging
It was not intended, but I have no other choice. I also use it for GraphQL subscriptions.
Closing, because it's not possible for Rebus to provide a subscription cleaning mechanism that would work consistently across transports.
Many things in Rebus are geared towards subscriptions coming and going about as often as tables in your database (i.e. not often - only as part of system evolution, never at random times when running), and from this it seems subscriptions are usually not orphaned.
Please protest if you disagree. Closing for now 🙂