spring-kafka
spring-kafka copied to clipboard
JsonSerializer implementation prevents extension
Implementation of org.springframework.kafka.support.serializer.JsonSerializer prevents previously supported extensibility. See the example below:
Custom JsonSerializer
public class ExtendedKafkaJsonSerializer<T> extends JsonSerializer<T> {
public static final String USE_DATE_FORMAT = "use.date.format";
public ExtendedKafkaJsonSerializer() {
super();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
useDateFormat(configs);
super.configure(configs, isKey);
}
private void useDateFormat(Map<String, ?> configs) {
if (configs.containsKey(USE_DATE_FORMAT)
&& Boolean.parseBoolean(configs.get(USE_DATE_FORMAT).toString())) {
this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
}
}
Producer configuration
new DefaultKafkaProducerFactory<>(ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ExtendedKafkaJsonSerializer.class)
.put(ExtendedKafkaJsonSerializer.USE_DATE_FORMAT, true)
.build());
Result
The serialization of date-time types will serialize to ISO format. Instead it serializes to timestamp.
Cause
Serialization is performed by
private ObjectWriter writer;
which is instantiated only in constructor
public JsonSerializer(JavaType targetType, ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "'objectMapper' must not be null.");
this.objectMapper = objectMapper;
this.writer = objectMapper.writerFor(targetType);
}
Since the ObjectWriter field:
- is private
- is not re-instantiated in
org.springframework.kafka.support.serializer.JsonSerializer#serialize(java.lang.String, T)method - is not re-instantiated in
org.springframework.kafka.support.serializer.JsonSerializer#configuremethod
it is not possible to reconfigure it, not without reflection.
Solution
There are couple of solutions:
- make
ObjectWriterprotected - always instantiate
ObjectWriterinorg.springframework.kafka.support.serializer.JsonSerializer#serialize(java.lang.String, T)method - instantiate
ObjectWriterinorg.springframework.kafka.support.serializer.JsonSerializer#configuremethod instead of constructor
My suggestion would to do both 1. and 3.
Version
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
Does it really need to be configurable via properties?
As a work around, you could use the alternative constructor...
public ExtendedKafkaJsonSerializer() {
ObjectMapper mapper = JacksonUtils.enhancedObjectMapper();
// configure it
super((JavaType) null, mapper);
}
It was never intended to be used that way, but rather direct instance provided for that DefaultKafkaProducerFactory where you already have a configured ObjectMapper injected.
But I still see your point your point and since we provide this property-based contract for this kind of components configuration, it must be as flexible as expected.
So, my suggestion is to call objectMapper.writerFor(targetType) one more time in the JsonSerializer.configure().
We can't remove it in the ctor, since the configure() might not be called at all. And it does not in case of direct instance injection into the DefaultKafkaProducerFactory.
Yes, @garyrussell , I also find this kind of contract from Apache Kafka a bit awkward by definition, but looks like community prefer do not use Java instances but rather let the client to create them and configure.
True; I was just offering the above as a work around until we fix this.
And it does not in case of direct instance injection into the
DefaultKafkaProducerFactory.
It is now, thanks to https://github.com/spring-projects/spring-kafka/commit/852c447442c9ac3a4eb41e2ddbcfc83ebe8c97da