spring-amqp icon indicating copy to clipboard operation
spring-amqp copied to clipboard

Consider adding dynamic scaling of consumers to DirectMessageListenerContainer [AMQP-707]

Open spring-operator opened this issue 8 years ago • 1 comments

Zahari Dichev opened AMQP-707 and commented

If we look at what the SimpleMessageListenerContainer does in order to determine whether we should scale consumers or not, we will quickly find out that it relies on the notion of consecutive successful polls (which is a function of the polling interval really so its quite subjective). If we try to implement the same notion in the DirectMessageListenerContainer we face a bit of a problem due to the event driven nature of the class.

It is possible however to gather some metrics that can allow us to intelligently (to an extend) determine whether there is a need to add additional consumers or not. We can think of the SimpleConsumer as processing messages in the following manner:

R1----E1++++++++EN1
                                     00000
                                               R2-----E2++++++++++EN2

In a sense R - E is the time between receiving the message and the start of the listener method execution, E - EN is the time spent executing the method and of course EN - R is the time that has passed between completing the processing of one message and receiving another. In effect this would be the "idle" time for the consumer. If we assume that this time is zero or some other value that implies a negligible unit of time, we can safely say that this consumer has a message available to process 100% of the time.

So in a sense, if we record a vector of such values and we assume that a value under a certain threshold indicates immediate availability of a message for a consumer, then we can almost translate that into the notion of consecutive successful polls (as in the Simple message listener container).

Scaling up From then on, if we have a consumer monitor task (which we already have in the DirectMessageListenerContainer), we can delegate to it to check every n milliseconds what is the count of "consecutive" messages for a consumer. If this count is more than 10 (or whatever other value has been set), we can spin up another consumer.

Scaling down In this case we can check when was the last time an event like the above mentioned before has been observed (aka receiving or finishing processing of a message). If we have assumed a value that signifies negligible delay between finishing up with one message and receiving the other, then I could imagine we can come up with a way to say. The time passed since the last received message is the equivalent of X unsuccessful polls or rather in the time that has passed, we could have processed 10 or so messages, therefore its reasonable to scale down.

Notes: This example is quite simplistic. In reality we would have to measure averages for all the consumers that are listening on a particular queue. We would probably also have to take into account (in the case of scaling down) the time it 'usually' takes to execute the listener function. Maybe we can track that with some time - waited average as well.


No further details from AMQP-707

spring-operator avatar Feb 02 '17 14:02 spring-operator