spring-kafka
spring-kafka copied to clipboard
Spring Kafka does not allow multiple headers with same key
I'm using Spring Kafka 3.1.2
Apache Kafka allows to have multiple headers with same key, but Spring Kafka limits that.
Sample project reproducting the issue is available here: https://github.com/poznachowski/sample-spring-kafka-headers
When I use Listener with ConsumerRecord
directly as payload I can see all the headers:
ConsumerRecord payload: SomePayload[value=payload], headers: RecordHeaders(headers = [RecordHeader(key = testHeader, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = testHeader, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false)
But I'd like to use much more convenient Spring abstractions such as: @Payload
and @Headers
, but with this approach I'm getting only a single testHeader
value:
headers: {kafka_offset=11, testHeader=[B@67388d79, ...
The reason for that is that KafkaHeaderMapper
implementations uses Map.put
method that effective replaces value with the latest.
@poznachowski If we change this outright so that multiple values are allowed, it may break existing consumer applications relying on a single value for the header. For example, suppose a producer application sends two values for a key. In that case, a consumer application built with a prior version of Spring Kafka may stop working or result in incorrect behavior. Even if we introduce some special headers on the producer side to opt-in for this, how do the existing consumer apps know about it? It feels like we need to think through the consequences before implementing this feature.
Well, that is a DefaultKafkaHeaderMapper
behavior.
Feel free to implement any custom one which would populate a list of values into the target message header.
I don't treat this is a bug since we never advertised such a behavior from the DefaultKafkaHeaderMapper
.
We may revise the default logic into something similar what we have in the DefaultHttpHeaderMapper
in Spring Integration.
Kafka record has always had a multi-entry capabilities...
@poznachowski We had an internal discussion on this matter and this is something we can adjust in the 3.2.0
version. Any existing consumers who are expecting a single value for a certain header need to migrate (adjust) the code. You are welcome to send a PR. Please let us know if you are willing (or the time) to do it.
Thank you for quick response. Everything makes sense. I'm willing to help with a PR. Any guidelines would be much appreciated.
@poznachowski Here are the contribution guidelines: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc. If you have any specific questions, please let us know.
I added very basic draft of SimpleKafkaHeaderMapper
adapted for iterable headers. Please let me know if this the direction I should follow ? I tried to utilize MultiValueMap
as seen in Spring Integration http module (DefaultHttpHeaderMapper
) but failed as here we are working with external org.apache.kafka.common.header.Headers