confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
How to filter messages by using headers?
Description
To avoid CPU overhead of deserializing messages that are not meant for the specific consumer, I would like to filter messages based on headers.
But how do you implement this? Since it does not seem to be possible to access the headers before the IDeserializer.Deserialize is called.
I would have expected this to be quite a normal usecase, but I can't seem to find an example.
it's not possible. kafka brokers may be extended in the future to have this capability, but currently they can't. what they do do is hugely efficient and very reliable (on account of being simple). that is the tradeoff.
Just to clarify. There is no built-in functionality in brokers to do that. But you don't have to deserialize message to access headers, it's possible directly from ConsumeResult :
if (consumeResult.Message.Headers.TryGetLastBytes("headerKey", out byte[] header))
var headerAsString = Encoding.UTF8.GetString(header);
Just to clarify. There is no built-in functionality in brokers to do that. But you don't have to deserialize message to access headers, it's possible directly from ConsumeResult :
if (consumeResult.Message.Headers.TryGetLastBytes("headerKey", out byte[] header)) var headerAsString = Encoding.UTF8.GetString(header);
At this stage, the message deserializer has already run, so the cost has already been incurred. I feel like you would need access to those headers as before the deserializer runs. Sort of a "before deserialize delegate" that would somehow allow you to abort the deserialization in a useful way.
@mhowlett - i think what is being asked for isn't broker-side filtering which we know isn't possible right now. So we're left with filtering on the client which is also inefficient because we would have to deserialize the message before inspecting it. I've seen it recommended that we could use headers for that (assuming the producer adds them) since they're separate from the payload and use flatbuffers which would be a much more lightweight process.
Maybe something that looked like
if(_consumer.TryConsume(stoppingToken, out MyMessage incomingMessage) {
// process the message
}
But we would need a way to insert middleware into it, maybe just a Func<Headers, bool>
to pass in
// where headers is a map/dict. Example, only deserialize if the incoming message does not contain key
if(_consumer.TryConsume(headers => !headers.ContainsKey("some-filter-key"), out MyMessage incomingMessage) {
//process the message
}
Or more appropriately, add it to the builder as a filter specification
// example interface
interface IMessageFilter {
bool ShouldDeserialize(Headers headers);
}
// implement as many as you need
var fooFilter = new FooFilter();
var barFilter = new BarFilter();
var consumer = new ConsumerBuilder<string, Foo>(config);
.SetValueDeserializer(avroDeserializer)
.SetClientFilters(fooFilter, barFilter)
.Build();
// consumer only returns messages that pass the filter
_consumer.ConsumeFiltered(cancellationToken);
~~I realize that would take a bunch of new piping to pull off, but I think it could be done in a non-breaking way.~~ Just now digging through the code and I think it could be done in a non-breaking fashion without requiring any updates to serializer packages, only the core package... 🤔
I also see some level of implementation here, but it seems to be just for Kafka Connect; https://docs.confluent.io/platform/current/connect/transforms/filter-ak.html
Gave it a shot Non-breaking: https://github.com/jstafford5380/confluent-kafka-dotnet/tree/feature/client-side-filtering
Changes are in Consumer.cs and ConsumerBuilder.cs, primarily and there is a usage example in a new example project. Pretty straight forward. In short, if you define any filters, messages that do not pass the criteria will not be returned from the Consume method. More specifically, it returns null, so Consume(CancellationToken)
will just discard it move to the next one as per usual.
I am really interested on this feature. In fact, I was looking for similar as Spring is doing.
Do you know if it's going to be able to do it in the next versions?
Spring is doing it
They add an annotation, which intercepts the Consumer loop. It still consumes the events, as raw bytes, and then deserializes the headers. As mentioned, above
you would need access to those headers as before the deserializer runs. Sort of a "before deserialize delegate" that would somehow allow you to abort the deserialization in a useful way.
@edenhill Do we have any plan in near feature to bring the Message filtering in Confluent Kafka?
As this requirement is open since last 3 years.
Hi All,
Any update on this feature?
Hi, is this feature is being looked into ?
Considering there's an SMT predicate function 'hasHeader' and it's source code is only 123 lines long, would it be much effort to make a predicate checking the value of a header? Currently the predicate just checks for not-null...