spring-boot icon indicating copy to clipboard operation
spring-boot copied to clipboard

Add support for Reactor Kafka

Open thepaep opened this issue 2 years ago • 38 comments

As said in https://github.com/reactor/reactor-kafka/issues/100#issuecomment-995178795 native support for Reactor Kafka would be nice to lots of users. Spring Boot currently supports WebFlux, but auto-configuration and more capabilities aren't supported yet with Reactor Kafka. As others said in different issues, I also think the connection between Reactor Kafka and Spring Boot should be more documented and supported since it's the most basic usage of Reactor Kafka. There is currently no official way of using Reactor Kafka with Spring Boot, which is pretty odd.

thepaep avatar Dec 16 '21 08:12 thepaep

+1

lynch19 avatar Dec 18 '21 16:12 lynch19

@snicoll @simonbasle @garyrussell any help with this?

Can someone describe what features are currently missing on this subject and how should they be implemented (what functionalities should be added and where), so it'll be easier for contributors to help with this?

lynch19 avatar Dec 18 '21 17:12 lynch19

+1

roger751 avatar Dec 18 '21 17:12 roger751

@lynch19 @roger751 please use the reaction on the original description rather than +1 comments like this.

snicoll avatar Dec 18 '21 17:12 snicoll

@lynch19 reactor-kafka has two fundamental properties objects

https://github.com/reactor/reactor-kafka/blob/main/src/main/java/reactor/kafka/receiver/ReceiverOptions.java

https://github.com/reactor/reactor-kafka/blob/main/src/main/java/reactor/kafka/sender/SenderOptions.java

Each with a number of properties.

In addition, creation of these takes a Map<String, Object> of regular Kafka Properties (similar to the ...properties extension for the spring-kafka auto configuration).

There are several levels of Boot auto configuration that would be useful, with the MVP being the auto-configuration of these two beans using application properties.

garyrussell avatar Dec 20 '21 15:12 garyrussell

I'm using the reactive kafka in my applications. The Reactor kafka provides everything you need to work. Or does it means support in the form of annotations? For example:

@ReactiveKafkaListener(topic = "smth", autoCommit=true)
public Mono<Void> consumer(DTO dto) {
    log.info("received dto: " + dto);
    return service.call(dto).then();
}

AntonLGVS avatar Dec 20 '21 19:12 AntonLGVS

No; not at all; this is just about configuring the sender and receiver options via Boot properties.

Such a mechanism would belong in Spring for Apache Kafka (spring-kafka), but there are currently no plans to do so.

garyrussell avatar Dec 20 '21 19:12 garyrussell

I would suggest something like package org.springframework.boot.autoconfigure.reactor.kafka in the spring-boot-autoconfigure project.

We can leverage the existing KafkaProperties for the consumer and producer properties passed into the receiver and sender options.

garyrussell avatar Jan 03 '22 19:01 garyrussell

This has been requested previously (https://github.com/spring-projects/spring-boot/issues/18751) but was declined as the Reactor team advised us against adding support. IIRC, this was due to the status of Reactor's Kafka support at the time and things may well have moved on since then.

@simonbasle, what's you take on this now please? Has this moved on sufficiently in the last couple of years that this is now worth considering again?

wilkinsona avatar Jan 04 '22 14:01 wilkinsona

@wilkinsona with the broad changes in 1.3.x late 2020, reactor-kafka has been made more stable and maintainable. thanks to @garyrussell and @artembilan, the project is more actively maintained. thus I would defer to gary and artem regarding that decision, but it definitely looks better than back in 2019

simonbasle avatar Jan 04 '22 16:01 simonbasle

I concur; it is in much better shape now, thanks to some significant work by Sergie back then. Also, due to community requests, the Spring Cloud Stream team are likely to incorporate it in the next major release and basic auto configuration of the sender and receiver properties will make things easier there too.

garyrussell avatar Jan 04 '22 16:01 garyrussell

@garyrussell what will it exactly demand? Only ReactiveKafkaAutoConfiguration, ReactiveKafkaAnnotationDrivenConfiguration (that will use KafkaProperties\ ReactiveKafkaProperties), ReceiverOptionsCustomizer and SenderOptionsCustomizer classes? Or is there anything else that needs to be noted?

We can leverage the existing KafkaProperties for the consumer and producer properties passed into the receiver and sender options.

Do you mean that we should use the existing KafkaProperties class, right?

almogtavor avatar Jan 04 '22 20:01 almogtavor

@almogtavor There would be no ReactiveKafkaAnnotationDrivenConfiguration - there are no annotations in that project.

As I said above, I'd say the MVP would be a ReactiveKafkaAutoConfiguration creating 2 beans SenderOptions and ReceiverOptions.

For example

@Bean
ReceiverOptions receiverOptions(KafkaProperties kp, ReactiveKafkaProperties rkp) {
    ReceiverOptions opts = ReceiverOptions.create(kp.buildConsumerProperties());
    /// apply rkps.getReceiver() properties here
    return opts;
}

Calling options customizers (if configured) before returning might be a nice addition, but not really a requirement because the user can further customize the properties where used to create a receiver/sender - since the options are immutable, the base options can be altered in different ways for each usage. A object is created each time a property is added.

garyrussell avatar Jan 04 '22 20:01 garyrussell

@garyrussell Seems great. Is there any need for using KafkaProperties? Seems that we can easily auto-configure all of the ReceiverOptions and SenderOptions parameters.

almogtavor avatar Jan 08 '22 19:01 almogtavor

KafkaProperties.Consumer and ....Producer encompass common Kafka consumer and producer properties, as well as a general .properties node, (passed into the create() methods as in my example above). We don't want to duplicate all of those properties here.

garyrussell avatar Jan 10 '22 16:01 garyrussell

Notice that there are not a lot of common parameters between the two (KafkaProperties.Consumer and KafkaProperties.Receiver have only 3 parameters in common). There are also some caveats I'd like you to take a look at:

  • https://github.com/spring-projects/spring-boot/issues/18751 won't get solved (since we still need to use KafkaProperties.
  • If some of the properties will be of KafkaProperties and some of ReactiveKafkaProperties, users will have to auto-configure some parameters with spring.kafka.whatever, and some with spring.reactor.kafka.whatever. I don't see here any other option.

almogtavor avatar Jan 10 '22 18:01 almogtavor

Notice that there are not a lot of common parameters between the two (KafkaProperties.Consumer and KafkaProperties.Receiver have only 3 parameters in common). There are also some caveats I'd like you to take a look at:

I am not talking about those properties, I am specifically talking about the ConsumerConfig and ProducerConfig kafka-clients properties (which are handled within KafkaProperties - some being first class properties that boot knows about, but other generic properties; see https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

Ideally, that part of KafkaProperties will be pulled out into a common super class.

garyrussell avatar Mar 12 '22 18:03 garyrussell

Agree about the common super class. Is this a thing you'd want later on or at the very first PR?

almogtavor avatar Mar 12 '22 21:03 almogtavor

@garyrussell I've raised a PR.

almogtavor avatar Apr 07 '22 20:04 almogtavor

As a continuation of the discussion in the PR...

For the others, I think it's too soon to say. I think we need to know what we want the auto-configuration to offer before we spend any more time thinking about the precise details. What's offered should be defined as the beans that will be auto-configured and the properties that will be available to control their configuration

I believe we want to auto-configure only the ReceiverOptions and SenderOptions in the initial offering.

@garyrussell @artembilan @almogtavor is this your understanding as well?

and the properties that will be available to control their configuration

The properties would be the 1st class properties offered by the Receiver/Sender options as well as the ability to specify the normal consumer/producer properties currently available in KafkaProperties.Consumer/Producer. It does sound like we may only want to offer a subset of those. And there is also a question of "overlapping" properties to figure out.

onobc avatar May 10 '22 14:05 onobc

As a continuation from this comment on the PR...

The number of Kafka properties are overwhelming; when we first added auto config, we picked a subset of properties to be first class, as discussed here: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

Specifically,

Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.

We have added others over time at user request (such as isolation level on that linked PR).

@garyrussell are you suggesting we should drop some of the properties in the current PR? I think the number of primitive properties on the ReceiverOptions (the ones chosen in the associated PR) are not too overwhelming. The more complicated props can be adjusted as needed in the options customizer that will be added shortly after.

Some kind of code generation of the properties from the kafka-clients *Config classes would be ideal.

Yes, this would be nice.

onobc avatar May 10 '22 14:05 onobc

I believe we want to auto-configure only the ReceiverOptions and SenderOptions in the initial offering.

That was my proposal for the MVP above, yes:

https://github.com/spring-projects/spring-boot/issues/29080#issuecomment-1005165885

For the "overlapping" properties (for spring-kafka), the hierarchy is as follows (e.g. for the consumer properties).

  1. Top level generic spring.kafka.properties
  2. First class boot-recognized properties
  3. Consumer generic ...consumer.properties

i.e. with:

spring.kafka.properties.auto.offset.reset=...
spring.kafka.consumer.auto-offset-reset=...
spring.kafka.consumer.properties.auto.offset.reset-...

the last one would win.

drop some of the properties in the current PR?

I am not sure what you mean; I am suggesting using the Producer and Consumer (from KafkaProperties) to build the map for ReceiverOptions.create() and expose the other methods that take simple values as Boot properties.

For complex types, for example, users can grab the auto configured ReceiverOptions and add listeners to it.

Given that the default implementation of ReceiverOptions is immutable, each use can "modify" the auto wired bean without affecting other uses.

@Autowired
ReceiverOptions ro;

...

ReceiverOptions one = ro.addAssignListener(...).subscription(List.of("topic1"));
ReceiverOptions two = ro.addAssignListener(...).subscription(List.of("topic2"));

one and two will be different objects.

garyrussell avatar May 10 '22 14:05 garyrussell

For the "overlapping" properties (for spring-kafka), the hierarchy is as follows (e.g. for the consumer properties)

@garyrussell

I agree w/ the property hierarchy and precedence as you outlined above. The "overlapping" ones I was referring to are the ones that are named similar but may or may not be the same thing (eg. consumer.auto-commit-interval and receiver option's commit-interval which is not passed into the consumer).

I am not sure what you mean; I am suggesting using the Producer and Consumer (from KafkaProperties) to build the map for ReceiverOptions.create() and expose the other methods that take simple values as Boot properties.

I am suggesting the same thing. My comment around the "drop some of the properties" I was referring to the 1st class boot properties in the current proposal. I was not sure if you were suggesting there were currently too many of them and we should pick the high priority ones. It sounds like that is not what you are saying though. Sorry for the confusion.

For complex types, for example, users can grab the auto configured ReceiverOptions and add listeners to it.

Given that the default implementation of ReceiverOptions is immutable, each use can "modify" the auto wired bean without affecting other uses.

Are you suggesting we do not add the Sender/ReceiverOptionCustomizers or that we can use this technique until we do add the customizers?

onobc avatar May 10 '22 15:05 onobc

On second thought, I suppose customizers would still make sense - e.g. for an app that creates multiple receivers but wants to add the same assignment listener to them all.

garyrussell avatar May 10 '22 15:05 garyrussell

  • RKP = ReactorKafkaProperties
  • KP = KafkaProperties

So I dug into each ReactorKafka ReceiverOptions property. Here is what I found:

TL;DR

The only consumer properties that needs to be considered as duplicate/overlapping at this point are:

  • ReceiverOptions.commitInterval
  • KafkaProperties.autoCommitInterval
  • and possibly KafkaProperties.enableAutoCommit

They are not the same exact property but could be used to control the same underlying concept. More details below.

Details

ReceiverOptions properties

ignore (we are not surfacing these - customizer only)

  • assignTopicPartitions
  • schedulerSupplier
  • assignListeners
  • revokeListeners
  • Deserializers
    • keyDeserializer
    • valueDeserializer These are passed to the KafkaConsumer but are expected to already be configured and configure() will not be called on them. So they are additive to the KP ones and not a problem here. We are not surfacing the in the RKP anyways but I wanted to detail what I found.

Unique to RK only

  • subscribeTopics
  • subscribePattern
  • pollTimeout - the timeout for each {@link KafkaConsumer#poll(long)} operation. Closest setting is spring.kafka.listener.pollTimeout
  • closeTimeout - timeout for graceful shutdown of {@link KafkaConsumer}. No equivalent in KP

Auto-commit related

So these were the ones that seemed likely overlapping

  • commitInterval - commit interval for automatic commits
  • commitBatchSize - commit batch size for automatic commits

ℹ️ If commit interval and commit batch size are configured, a commit operation is scheduled when either the interval or batch size is reached

ℹ️ The KP has the following similar properties but always sets enableAutoCommit to false and controls this in the Reactive API

  • enableAutoCommit
  • autoCommitInterval - frequency to commit if above is set to true
Unique to RK only

While these are commit related, I believe these have no equivalent in KP

  • maxCommitAttempts - max num consecutive non-fatal commit failures tolerated
  • commitRetryInterval - how long to wait before retry
  • maxDeferredCommits - max out-of-order commits

Needs more digging

I think this is RKP only but need to dig a bit more to see what KP options are available for out-of-order-commits

  • atmostOnceCommitAheadSize - commit ahead size per partition for at-most-once delivery

onobc avatar May 12 '22 03:05 onobc

The only consumer properties that needs to be considered as duplicate/overlapping at this point are:

ReceiverOptions.commitInterval KafkaProperties.autoCommitInterval and possibly KafkaProperties.enableAutoCommit

This "overlap" is by name only; unfortunate, but true.

The latter two control whether the kafka-clients automatically commits the offsets on a schedule.

The first one is the interval used by reactor kafka to automatically commit offsets.

Users should not enable both mechanisms; in spring-kafka, we disable enable.auto.commit by default, because it takes care of the commits in a more deterministic fashion.

reactor-kafka also disables it - see ImmutableReceiverOptions (ctor).

this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

garyrussell avatar May 12 '22 14:05 garyrussell

We should consider removing those properties from KafkaProperties - spring-kafka prefers them not to be set and reactor-kafka ignores them.

garyrussell avatar May 12 '22 14:05 garyrussell

Users should not enable both mechanisms; in spring-kafka, we disable enable.auto.commit by default, because it takes care of the commits in a more deterministic fashion.

reactor-kafka also disables it - see ImmutableReceiverOptions (ctor).

this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Agreed @garyrussell - that is what I was alluding to here as well with these comments:

They are not the same exact property but could be used to control the same underlying concept. More details below.

ℹ️ The KP has the following similar properties but always sets enableAutoCommit to false and controls this in the Reactive API

If they are not desired in spring-kafka either, it seems like a great thing to remove (as you suggested).

onobc avatar May 12 '22 15:05 onobc

  • RKP = ReactorKafkaProperties
  • KP = KafkaProperties

So I dug into each ReactorKafka SenderOptions properties. Here is what I found:

TL;DR

These are much more straight forward than the consumer/receiver properties.

Properties unique to RKP

  • closeTimeout - timeout for graceful shutdown of sender
  • maxInFlight - max num in-flight records fetched from the outbound record publisher while acks are pending
  • stopOnError - indicates if a send op should be terminated when error encountered

ignore (not surfacing via props - customizer only)

  • scheduler - the scheduler used for publishing send results
  • Serializers
    • keySerializer
    • valueSerializer These are passed to the KafkaProducer directly and are expected to already be configured and configure() will not be called on them. So they are additive to the KP ones and not a problem here. We are not surfacing the in the RKP anyways but I wanted to detail what I found.

onobc avatar May 13 '22 03:05 onobc

Based on the above analysis of the KafkaProperties and the Sender/ReceiverOptions of ReactorKafka, here is a suggested list of properties as well as a suggested "layout" of them. I think we can use this as a starting point of how to map them out.

spring:
  kafka:
    reactor:

      # common properties (just like KafkaProperties)
      bootstrap-servers: localhost:9093
      ssl: ...
      security: ...
      properties: ...
      client.id: fooDemoApp

      # Producer class - pass to Kafka producer props (aka SenderOptions.properties)
      producer:
        key-serializer: org.apache.kafka.common.serialization.LongSerializer
        value-serializer: com.example.FooSerializer
        buffer-memory: 32MB # used in tandem w/ max-in-flight
        ...

      # SenderOptions props
      sender:
        close-timeout: 5m
        max-in-flight: 256
        stop-on-error: false

      # Consumer class - pass to Kafka consumer props (aka ReceiverOptions.properties)
      consumer:
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
        value-deserializer: com.example.FooDeserializer
        ...
        # DELETE enable-auto-commit:
        # DELETE auto-commit-interval:

      # ReceiverOptions props
      receiver:
        subscribe-topics:
        subscribe-pattern:
        poll-timeout:
        close-timeout:
        commit-interval:
        commit-batch-size:
        max-commit-attempts:
        commit-retry-interval:
        max-deferred-commits:
        atmost-once-commit-ahead-size:

This assumes that no properties will be used for ReactorKafka directly under spring.kafka. but rather the properties for RK start under spring.kafka.reactor.. Do we want to allow regular and reactor spring-kafka to share properties under spring.kafka.? I could see possibly wanting to do that for the list of current common properties

  • bootstrap-servers: localhost:9093
  • ssl: ...
  • security: ...
  • properties: ...
  • client.id: fooDemoApp

A benefit of not sharing the KP producer/consumer props (the ones under spring.kafka.producer|consumer) and mapping them instead separately under spring.kafka.reactor.producer|consumer w/ RK is that both regular and reactor variants can co-exist and be configured independently and not worry about one change affecting the other one.

onobc avatar May 13 '22 03:05 onobc