spring-integration icon indicating copy to clipboard operation
spring-integration copied to clipboard

Enhancement: Manage back pressure on asynchronous message providers such as Kafka

Open cydergoth opened this issue 6 years ago • 7 comments

Affects Version(s): <Spring Integration version>


As a message producing application, I should like to monitor backpressure on the asynchronous message channel (e.g. Kafka producer via Spring Streams Kafka Binder) by being aware of the asynchronous queue capacity, current asynchronous queue water mark, and registering a callback at a certain high water mark.

It looks as if spring-integration/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java is a suitable place to integrate these metrics and callback.

Sample pseudocode -

@Autowired
Source kafkaOutput;

public void init() {
   channel = kafkaOutput.output().getMessageChannel();
   channel.registerHighWaterCallback("90%", (event) -> throttleProducer(event));
}

public void send (..... ) {
    int capacity =  kafkaOutput.output().getMessageChannel().getAsyncQueueCapacity();
    int current =  kafkaOutput.output().getMessageChannel().getAsyncQueueWaterMark(); // Latest estimate
}

cydergoth avatar Mar 23 '19 19:03 cydergoth

Need more info on the matter, e.g. some theoretical instructions what does it mean.

I don't think the AbstractMessageChannel is a right thing to go. An AbstractPollableChannel might be, because exactly this one is about async hand off and some queue to store messages for future processing by the poller.

On the other hand there is really nothing we can do with the Streams Kafka Binder - there is just no channels in between at all. So, the request might be really should go to the mentioned project or even directly to Apache Kafka Streams.

Although I may guess that we are open to improve Spring integration channels for such a metrics hook.

More info about existing metrics is here: https://docs.spring.io/spring-integration/reference/html/#mgmt-channel-features

Thanks for understanding!

artembilan avatar Mar 23 '19 19:03 artembilan

I believe that because the binder builds on top of the integration, it is necessary for this functionality to be exposed here before the binder can use it. I am also referring to producing messages, which in the Kafka Template may be done asynchronously, not consuming messages

cydergoth avatar Mar 25 '19 14:03 cydergoth

Note: The async message queue I am referring to is internal to the Kafka producer, not an application level queue.

cydergoth avatar Mar 25 '19 14:03 cydergoth

@cydergoth Can you explain where we get this information from the kafka clients? I don't see any obvious APIs on the KafkaProducer.

garyrussell avatar Mar 25 '19 15:03 garyrussell

Hmm, it seems like they may also need to implement something here, as they provide a number of properties to manage the size of the queue (buffer) but you are correct that they don't seem to have a direct call to get the high water mark.

cydergoth avatar Mar 25 '19 16:03 cydergoth

@cydergoth ,

any update on this?

We have plans to move Spring Integration Kafka project to this one, so fixing this issue respectively afterwards would make sense: https://github.com/spring-projects/spring-integration-kafka/issues/304

Thank you!

artembilan avatar Apr 30 '20 15:04 artembilan

Hey. Sorry, missed this update. I've been moved onto a different project.

cydergoth avatar Sep 30 '21 22:09 cydergoth