Enhancement: Manage back pressure on asynchronous message providers such as Kafka
Affects Version(s): <Spring Integration version>
As a message producing application, I should like to monitor backpressure on the asynchronous message channel (e.g. Kafka producer via Spring Streams Kafka Binder) by being aware of the asynchronous queue capacity, current asynchronous queue water mark, and registering a callback at a certain high water mark.
It looks as if spring-integration/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java is a suitable place to integrate these metrics and callback.
Sample pseudocode -
@Autowired
Source kafkaOutput;
public void init() {
channel = kafkaOutput.output().getMessageChannel();
channel.registerHighWaterCallback("90%", (event) -> throttleProducer(event));
}
public void send (..... ) {
int capacity = kafkaOutput.output().getMessageChannel().getAsyncQueueCapacity();
int current = kafkaOutput.output().getMessageChannel().getAsyncQueueWaterMark(); // Latest estimate
}
Need more info on the matter, e.g. some theoretical instructions what does it mean.
I don't think the AbstractMessageChannel is a right thing to go. An AbstractPollableChannel might be, because exactly this one is about async hand off and some queue to store messages for future processing by the poller.
On the other hand there is really nothing we can do with the Streams Kafka Binder - there is just no channels in between at all. So, the request might be really should go to the mentioned project or even directly to Apache Kafka Streams.
Although I may guess that we are open to improve Spring integration channels for such a metrics hook.
More info about existing metrics is here: https://docs.spring.io/spring-integration/reference/html/#mgmt-channel-features
Thanks for understanding!
I believe that because the binder builds on top of the integration, it is necessary for this functionality to be exposed here before the binder can use it. I am also referring to producing messages, which in the Kafka Template may be done asynchronously, not consuming messages
Note: The async message queue I am referring to is internal to the Kafka producer, not an application level queue.
@cydergoth Can you explain where we get this information from the kafka clients? I don't see any obvious APIs on the KafkaProducer.
Hmm, it seems like they may also need to implement something here, as they provide a number of properties to manage the size of the queue (buffer) but you are correct that they don't seem to have a direct call to get the high water mark.
@cydergoth ,
any update on this?
We have plans to move Spring Integration Kafka project to this one, so fixing this issue respectively afterwards would make sense: https://github.com/spring-projects/spring-integration-kafka/issues/304
Thank you!
Hey. Sorry, missed this update. I've been moved onto a different project.