SlimMessageBus
SlimMessageBus copied to clipboard
MessageType filtering
I'm implementing SlimMessageBus with EventHubs and am targeting a single EventHubs instance (topic) for all messages. Using pub/sub so my registrations more or less look something like:
MessageBusBuilder.Create()
.Consume<FooBar>(config => config
.Topic(configuration["SharedTopic"])
.Group(configuration["ServiceConsumerGroup"])
.WithConsumer<FooBarConsumer>())
I'm noticing that each of my consumer groups picks up messages for all message types...ie. the message processing does not filter on message type. Since the messages are not filtered by type, I'm wondering if your intention was each message would get its own topic.
Hello @apitzele , the consumer side takes the declared type and tries to deserialize arriving messages into that type.
No filtering is implemented nor was intended as SMB is simple in nature.
Such filtering would have to be done in the app's consumer (FooBarConsumer where it decides what msgs to skip) or on the infrastructure (Azure Event Hubs don't support that as far as I know, but Azure SB does support filtering). The former would make sense if you have polymorphic message types (see my comment further down).
Furthermore, the message type you publish is expected to be deserializable into the type you declare that you consume. If you have some base message type and messages that inherit from that you could send any message in that same class hierarchy on that same hub (topic) - in the case of JSON serialization the message type would be added to the message, so that the consumer knows what C# type to deserialize to (more here). Othwerwise, if you don't have a message type hierarchy or the chosen serializer does not support polymorphic types then you should use one hub (topic) per message type (otherwise message type A is sent but consumer expects message type B).
I hope this helps.
@zarusz, thanks for the explanation. I think I'd like to avoid leaving the responsibility for verifying message types to the consumers. Since the consumer is already bound to a type, it seems redundant to require each one to verify the message after it's been deserialized. Also, for better or worse, our entities that get put onto the message bus happen to be our DTOs so I'd like to avoid hanging any additional properties off of them.
For now, I have a private fork that passes the type as a header so the PartitionConsumer can short circuit the deserialization of both the byte array and json string. Obviously, I'd prefer to avoid maintaining a distinct repo. Since it feels like you are wanting to keep this library lean, I'm going to bet adding a message filtering abstraction to the EventHubs plugin is not something you'd like to bring into the project. Completely understand if that's not the direction you want to go. If it does sound like a fit, I'd consider contributing. Either way, I really appreciate what you've done with this library. It's been a pleasure working w/ it.
On a related note, I have plans pas the message type as part of the message headers (for transports that support message headers). This is to not have to rely on the particular serializer implementation, but mostly to satisfy the need to "not pollute" the message payload (there are pros and cons witch each approach).
Now, after this is added, then having filtering (and even further routing to consumer/handler based on specific type) could be added to the master. One thing that I worry is that this might overcomplicate the configuration and that it would be confusing for users. Unless we add this as a specific opt-int feature.
What do you think?
How do you configure that in your fork?
I didn't need to consider configuration since I'm the only 'customer'. That made it pretty easy. I was thinking along the lines of a filter that could be configured per consumer or even globally. Once you update the library include the messagetype it seems like a viable solution but admittedly I've only dug in superficially. I haven't spent much time w/ the other transports so I'm not seeing the complete picture. This is pretty much how I thought the filtering could be structured:
interface IMessageFilter
{
bool CanProcess(ConsumerSettings settings, MessageWithHeaders message);
}
MessageBusBuilder.Create()
.Consume<FooBar>(config => config
.Topic(configuration["SharedTopic"])
.Group(configuration["ServiceConsumerGroup"])
.WithMessageFilter<MessageTypeFilter>()
// or
.WithMessageFilter((settings, message) => true)
.WithConsumer<FooBarConsumer>())
If generically exposing MessageWithHeaders is problematic, a simpler approach would be transport specific extensions Something like:
.WithEventHubsMessageFilter((settings, eventData) => true) //where we're working directly against the payload instead of its abstraction
I like the generic solution better but not sure if it's as viable. Thoughts?
I created an issue to add message type in the native message headers for supported transports. Beside the filtering scenario I have some other use cases for that. Feel free to comment on the ideas over there.
That issue is a prerequisite to allow to implement filtering. I am still thinking if just keep the filtering so simple that it would just rely on the consumer type definition Consume<TMessage>
or wheather to also give the additional filtering customization as with your .WithMessageFilter(Func<ConsumerSettings, TMessage>)
example. Potentially it will end up being both...
With the recent change #130, it should be possible to push multiple message types via one event hub. See the docs.