spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

JsonSerializer implementation prevents extension

Open arabczuk-equinix opened this issue 4 years ago • 3 comments

Implementation of org.springframework.kafka.support.serializer.JsonSerializer prevents previously supported extensibility. See the example below:

Custom JsonSerializer

public class ExtendedKafkaJsonSerializer<T> extends JsonSerializer<T> {

    public static final String USE_DATE_FORMAT = "use.date.format";

    public ExtendedKafkaJsonSerializer() {
        super();
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        useDateFormat(configs);
        super.configure(configs, isKey);
    }

    private void useDateFormat(Map<String, ?> configs) {
        if (configs.containsKey(USE_DATE_FORMAT)
                && Boolean.parseBoolean(configs.get(USE_DATE_FORMAT).toString())) {
            this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        }
    }
}

Producer configuration

new DefaultKafkaProducerFactory<>(ImmutableMap.<String, Object>builder()
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ExtendedKafkaJsonSerializer.class)
                .put(ExtendedKafkaJsonSerializer.USE_DATE_FORMAT, true)
                .build());

Result

The serialization of date-time types will serialize to ISO format. Instead it serializes to timestamp.

Cause

Serialization is performed by

private ObjectWriter writer;

which is instantiated only in constructor

public JsonSerializer(JavaType targetType, ObjectMapper objectMapper) {
    Assert.notNull(objectMapper, "'objectMapper' must not be null.");
    this.objectMapper = objectMapper;
    this.writer = objectMapper.writerFor(targetType);
}

Since the ObjectWriter field:

  1. is private
  2. is not re-instantiated in org.springframework.kafka.support.serializer.JsonSerializer#serialize(java.lang.String, T) method
  3. is not re-instantiated in org.springframework.kafka.support.serializer.JsonSerializer#configure method

it is not possible to reconfigure it, not without reflection.

Solution

There are couple of solutions:

  1. make ObjectWriter protected
  2. always instantiate ObjectWriter in org.springframework.kafka.support.serializer.JsonSerializer#serialize(java.lang.String, T) method
  3. instantiate ObjectWriter in org.springframework.kafka.support.serializer.JsonSerializer#configure method instead of constructor

My suggestion would to do both 1. and 3.

Version

  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.7.1</version>

arabczuk-equinix avatar Sep 27 '21 12:09 arabczuk-equinix

Does it really need to be configurable via properties?

As a work around, you could use the alternative constructor...

    public ExtendedKafkaJsonSerializer() {
        ObjectMapper mapper = JacksonUtils.enhancedObjectMapper();
        // configure it
        super((JavaType) null, mapper);
    }

garyrussell avatar Sep 27 '21 18:09 garyrussell

It was never intended to be used that way, but rather direct instance provided for that DefaultKafkaProducerFactory where you already have a configured ObjectMapper injected. But I still see your point your point and since we provide this property-based contract for this kind of components configuration, it must be as flexible as expected. So, my suggestion is to call objectMapper.writerFor(targetType) one more time in the JsonSerializer.configure(). We can't remove it in the ctor, since the configure() might not be called at all. And it does not in case of direct instance injection into the DefaultKafkaProducerFactory.

Yes, @garyrussell , I also find this kind of contract from Apache Kafka a bit awkward by definition, but looks like community prefer do not use Java instances but rather let the client to create them and configure.

artembilan avatar Sep 27 '21 18:09 artembilan

True; I was just offering the above as a work around until we fix this.

And it does not in case of direct instance injection into the DefaultKafkaProducerFactory.

It is now, thanks to https://github.com/spring-projects/spring-kafka/commit/852c447442c9ac3a4eb41e2ddbcfc83ebe8c97da

garyrussell avatar Sep 27 '21 19:09 garyrussell