spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

ConcurrentMessageListenerContainer isChildRunning API is returning false even though active MessageListenerContainer instances are processing messages.

Open LokeshAlamuri opened this issue 1 year ago • 10 comments
trafficstars

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.1.3

Application

SpringBoot application uses spring-kafka module to consume and process the messages. MessageListenerContainer concurrency is 4. CommonContainerStoppingErrorHandler is chosen to stop the Container on any failures. Assume it would take 60 seconds to complete processing any message.

Describe the bug

One of the MessageListenerContainer instance throws a fatal exception. CommonContainerStoppingErrorHandler would stop the MessageListenerContainerContainer. After this point, we if try to check the MessageListenerContainer instances state immediately using following code snippet, it would show no MessageListenerContainer instances are running. Because, the state is updated prematurely as not running.

code: org.springframework.kafka.listener.ListenerContainerRegistry listenerContainerRegistry;

listenerContainerRegistry.getListenerContainers().stream(). filter(messageListenerContainer -> messageListenerContainer.isChildRunning()).count() -- This is returning (zero)0. Ideally it should return zero only after stopping processing of messages in all the instances.

Ideally, it would take some time to properly complete processing of the messages in other MessageListenerContainer instances.

To Reproduce

Any spring-kafka consumer application

Expected behavior

org.springframework.kafka.listener.ListenerContainerRegistry listenerContainerRegistry;

listenerContainerRegistry.getListenerContainers().stream(). filter(messageListenerContainer -> messageListenerContainer.isChildRunning()).count() -- This should not return (zero)0.

The above code should return zero after receiving closed notification from all the MessageListenerContainer instances.

Problem From the application perspective, state of the Kafka-Consumer component is reported incorrectly. This is a great problem if at all we plan to shutdown the application based on the reported state. But because of this bug, application would be stopped prematurely.

As of now, MessageListnerContainer APIs are not reporting the right status regarding when exactly all the MessageListnerContainer instances are completed processing.

Sample

https://github.com/LokeshAlamuri/SpringBootKafkaConsumerDemo/blob/main/src/main/java/com/example/springboot/ConsumerStoppedEventListener.java

Root Cause

As per my analysis, I could see that 'isChildRunning' API is returning false based on the org.springframework.kafka.listener.ConcurrentMessageListenerContainer 'running' variable. But, not based on the active MessageLisntenerContainer instances.

Solution

'isChildRunning' API should return as false by tracking the closed MessageListenerContainer instances count after it receives notification using 'childStopped' method.

LokeshAlamuri avatar Jun 29 '24 13:06 LokeshAlamuri