confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

How to filter messages by using headers?

Open SierraNL opened this issue 4 years ago • 12 comments

Description

To avoid CPU overhead of deserializing messages that are not meant for the specific consumer, I would like to filter messages based on headers.

But how do you implement this? Since it does not seem to be possible to access the headers before the IDeserializer.Deserialize is called.

I would have expected this to be quite a normal usecase, but I can't seem to find an example.

SierraNL avatar Sep 25 '20 10:09 SierraNL

it's not possible. kafka brokers may be extended in the future to have this capability, but currently they can't. what they do do is hugely efficient and very reliable (on account of being simple). that is the tradeoff.

mhowlett avatar Sep 25 '20 16:09 mhowlett

Just to clarify. There is no built-in functionality in brokers to do that. But you don't have to deserialize message to access headers, it's possible directly from ConsumeResult :

if (consumeResult.Message.Headers.TryGetLastBytes("headerKey", out byte[] header))
     var headerAsString =  Encoding.UTF8.GetString(header);

Vfialkin avatar Jun 22 '21 09:06 Vfialkin

Just to clarify. There is no built-in functionality in brokers to do that. But you don't have to deserialize message to access headers, it's possible directly from ConsumeResult :

if (consumeResult.Message.Headers.TryGetLastBytes("headerKey", out byte[] header))
     var headerAsString =  Encoding.UTF8.GetString(header);

At this stage, the message deserializer has already run, so the cost has already been incurred. I feel like you would need access to those headers as before the deserializer runs. Sort of a "before deserialize delegate" that would somehow allow you to abort the deserialization in a useful way.

@mhowlett - i think what is being asked for isn't broker-side filtering which we know isn't possible right now. So we're left with filtering on the client which is also inefficient because we would have to deserialize the message before inspecting it. I've seen it recommended that we could use headers for that (assuming the producer adds them) since they're separate from the payload and use flatbuffers which would be a much more lightweight process.

Maybe something that looked like

if(_consumer.TryConsume(stoppingToken, out MyMessage incomingMessage) {
 // process the message
}

But we would need a way to insert middleware into it, maybe just a Func<Headers, bool> to pass in

// where headers is a map/dict. Example, only deserialize if the incoming message does not contain key
if(_consumer.TryConsume(headers => !headers.ContainsKey("some-filter-key"), out MyMessage incomingMessage) {
  //process the message
}

Or more appropriately, add it to the builder as a filter specification

// example interface
interface IMessageFilter {
  bool ShouldDeserialize(Headers headers);
}

// implement as many as you need
var fooFilter = new FooFilter();
var barFilter = new BarFilter();

var consumer = new ConsumerBuilder<string, Foo>(config);
  .SetValueDeserializer(avroDeserializer)
  .SetClientFilters(fooFilter, barFilter)
  .Build();

// consumer only returns messages that pass the filter
_consumer.ConsumeFiltered(cancellationToken);

~~I realize that would take a bunch of new piping to pull off, but I think it could be done in a non-breaking way.~~ Just now digging through the code and I think it could be done in a non-breaking fashion without requiring any updates to serializer packages, only the core package... 🤔

jstafford5380 avatar Mar 25 '22 19:03 jstafford5380

I also see some level of implementation here, but it seems to be just for Kafka Connect; https://docs.confluent.io/platform/current/connect/transforms/filter-ak.html

jstafford5380 avatar Mar 25 '22 20:03 jstafford5380

Gave it a shot Non-breaking: https://github.com/jstafford5380/confluent-kafka-dotnet/tree/feature/client-side-filtering

Changes are in Consumer.cs and ConsumerBuilder.cs, primarily and there is a usage example in a new example project. Pretty straight forward. In short, if you define any filters, messages that do not pass the criteria will not be returned from the Consume method. More specifically, it returns null, so Consume(CancellationToken) will just discard it move to the next one as per usual.

jstafford5380 avatar Mar 25 '22 22:03 jstafford5380

I am really interested on this feature. In fact, I was looking for similar as Spring is doing.
Do you know if it's going to be able to do it in the next versions?

fparareda avatar Jun 07 '23 15:06 fparareda

Spring is doing it

They add an annotation, which intercepts the Consumer loop. It still consumes the events, as raw bytes, and then deserializes the headers. As mentioned, above

you would need access to those headers as before the deserializer runs. Sort of a "before deserialize delegate" that would somehow allow you to abort the deserialization in a useful way.

OneCricketeer avatar Jun 21 '23 18:06 OneCricketeer

@edenhill Do we have any plan in near feature to bring the Message filtering in Confluent Kafka?

As this requirement is open since last 3 years.

ksdvishnukumar avatar Jul 11 '23 03:07 ksdvishnukumar

Hi All,

Any update on this feature?

ksdvishnukumar avatar Jul 31 '23 07:07 ksdvishnukumar

Hi, is this feature is being looked into ?

murari-goswami avatar Oct 09 '23 12:10 murari-goswami

Considering there's an SMT predicate function 'hasHeader' and it's source code is only 123 lines long, would it be much effort to make a predicate checking the value of a header? Currently the predicate just checks for not-null...

carlo-quinonez avatar Jan 25 '24 23:01 carlo-quinonez