spring-kafka
spring-kafka copied to clipboard
ConcurrentMessageListenerContainer isChildRunning API is returning false even though active MessageListenerContainer instances are processing messages.
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.1.3
Application
SpringBoot application uses spring-kafka module to consume and process the messages. MessageListenerContainer concurrency is 4. CommonContainerStoppingErrorHandler is chosen to stop the Container on any failures. Assume it would take 60 seconds to complete processing any message.
Describe the bug
One of the MessageListenerContainer instance throws a fatal exception. CommonContainerStoppingErrorHandler would stop the MessageListenerContainerContainer. After this point, we if try to check the MessageListenerContainer instances state immediately using following code snippet, it would show no MessageListenerContainer instances are running. Because, the state is updated prematurely as not running.
code: org.springframework.kafka.listener.ListenerContainerRegistry listenerContainerRegistry;
listenerContainerRegistry.getListenerContainers().stream(). filter(messageListenerContainer -> messageListenerContainer.isChildRunning()).count() -- This is returning (zero)0. Ideally it should return zero only after stopping processing of messages in all the instances.
Ideally, it would take some time to properly complete processing of the messages in other MessageListenerContainer instances.
To Reproduce
Any spring-kafka consumer application
Expected behavior
org.springframework.kafka.listener.ListenerContainerRegistry listenerContainerRegistry;
listenerContainerRegistry.getListenerContainers().stream(). filter(messageListenerContainer -> messageListenerContainer.isChildRunning()).count() -- This should not return (zero)0.
The above code should return zero after receiving closed notification from all the MessageListenerContainer instances.
Problem From the application perspective, state of the Kafka-Consumer component is reported incorrectly. This is a great problem if at all we plan to shutdown the application based on the reported state. But because of this bug, application would be stopped prematurely.
As of now, MessageListnerContainer APIs are not reporting the right status regarding when exactly all the MessageListnerContainer instances are completed processing.
Sample
https://github.com/LokeshAlamuri/SpringBootKafkaConsumerDemo/blob/main/src/main/java/com/example/springboot/ConsumerStoppedEventListener.java
Root Cause
As per my analysis, I could see that 'isChildRunning' API is returning false based on the org.springframework.kafka.listener.ConcurrentMessageListenerContainer 'running' variable. But, not based on the active MessageLisntenerContainer instances.
Solution
'isChildRunning' API should return as false by tracking the closed MessageListenerContainer instances count after it receives notification using 'childStopped' method.