spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

Spring Kafka does not allow multiple headers with same key

Open poznachowski opened this issue 1 year ago • 6 comments

I'm using Spring Kafka 3.1.2

Apache Kafka allows to have multiple headers with same key, but Spring Kafka limits that.

Sample project reproducting the issue is available here: https://github.com/poznachowski/sample-spring-kafka-headers

When I use Listener with ConsumerRecord directly as payload I can see all the headers:

ConsumerRecord payload: SomePayload[value=payload], headers: RecordHeaders(headers = [RecordHeader(key = testHeader, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = testHeader, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false)

But I'd like to use much more convenient Spring abstractions such as: @Payload and @Headers, but with this approach I'm getting only a single testHeader value:

headers: {kafka_offset=11, testHeader=[B@67388d79, ...

The reason for that is that KafkaHeaderMapper implementations uses Map.put method that effective replaces value with the latest.

poznachowski avatar Feb 23 '24 13:02 poznachowski

@poznachowski If we change this outright so that multiple values are allowed, it may break existing consumer applications relying on a single value for the header. For example, suppose a producer application sends two values for a key. In that case, a consumer application built with a prior version of Spring Kafka may stop working or result in incorrect behavior. Even if we introduce some special headers on the producer side to opt-in for this, how do the existing consumer apps know about it? It feels like we need to think through the consequences before implementing this feature.

sobychacko avatar Feb 23 '24 19:02 sobychacko

Well, that is a DefaultKafkaHeaderMapper behavior. Feel free to implement any custom one which would populate a list of values into the target message header.

I don't treat this is a bug since we never advertised such a behavior from the DefaultKafkaHeaderMapper.

We may revise the default logic into something similar what we have in the DefaultHttpHeaderMapper in Spring Integration.

Kafka record has always had a multi-entry capabilities...

artembilan avatar Feb 23 '24 19:02 artembilan

@poznachowski We had an internal discussion on this matter and this is something we can adjust in the 3.2.0 version. Any existing consumers who are expecting a single value for a certain header need to migrate (adjust) the code. You are welcome to send a PR. Please let us know if you are willing (or the time) to do it.

sobychacko avatar Feb 23 '24 21:02 sobychacko

Thank you for quick response. Everything makes sense. I'm willing to help with a PR. Any guidelines would be much appreciated.

poznachowski avatar Feb 24 '24 10:02 poznachowski

@poznachowski Here are the contribution guidelines: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc. If you have any specific questions, please let us know.

sobychacko avatar Feb 26 '24 15:02 sobychacko

I added very basic draft of SimpleKafkaHeaderMapper adapted for iterable headers. Please let me know if this the direction I should follow ? I tried to utilize MultiValueMap as seen in Spring Integration http module (DefaultHttpHeaderMapper) but failed as here we are working with external org.apache.kafka.common.header.Headers

poznachowski avatar Mar 05 '24 15:03 poznachowski