KAFKA-17853: Fix termination issue in ConsoleConsumer and ConsoleShareConsumer
https://issues.apache.org/jira/browse/KAFKA-17853 -
-
There is an issue with the console share consumer where if the broker is unavailable, even after force terminating using ctrl-c, the consumer does not shut down immediately. It takes around ~30 seconds to close once the broker shuts down.
-
The console consumer on the other hand, was supposedly shutting down immediately once we press ctrl-c. On reproducing the issue with a local kafka server, I observed the issue was present in both the console consumer and the console share consumer.
Issue :
-
On seeing the client debug logs, this issue seemed related to network thread sending repeated
FindCoordinatorrequests until the timer expired. This was happening in both the console-consumer and console-share-consumer. -
Debug logs showed that when the broker is shut down, the heartbeat fails with a
DisconnectException(which is retriable), this triggers afindCoordinatorrequest on the network thread which retries until the default timeout expires. -
This request is sent even before we trigger a close on the consumer, so once we press ctrl-c, although the
ConsumerNetworkThread::close()is triggered, it waits for the default timeout until all the requests are sent out for a graceful shutdown.
PR aims to fix this issue by adding a check in NetworkClientDelegate
to remove any pending unsent requests(with empty node values) during
close. This would avoid unnecessary retries and the consumers would
shut down immediately upon termination.
Share consumers shutting down after the fix.
[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8,
node=Optional.empty, remainingMs=28565} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
FindCoordinator request failed due to retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:23:42,176] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closing
RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,177] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,179] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closed
the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:23:42,181] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Kafka
share consumer has been closed
(org.apache.kafka.clients.consumer.internals.ShareConsumerImpl)
Processed a total of 0 messages
Regular consumers shutting down after the fix.
[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b,
node=Optional.empty, remainingMs=29160} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] FindCoordinator request failed due to
retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closing RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing test-topic-23-0 from buffered
fetch data as it is not in the set of partitions to retain ([])
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closed the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Kafka consumer has been closed
(org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer)
Processed a total of 0 messages
Great to see the fix in the area, thanks for looking into this.
Another edge case here:
If the broker never started, and we attempt a ctrl-c to close the consumer, then still took 30 seconds for the ConsoleShareConsumer to close. The problem lied with the ShareConsumeRequestManager where we do not complete the closeFuture when the memberId is null.
Added a fix for the same and verified that both the ConsoleConsumer and ConsoleShareConsumer work consistently and close even if the broker never started.
Hey, thanks for looking into this! Sorry I haven't had the time to look in detail, but just couple of high level comments:
- we need to be careful with aborting the find coord on close (ex. we do need to do it, if we have time, to send the pending async commits).
- the console consumer behaviour is aligned really with the way it's calling
consumer.close()here https://github.com/apache/kafka/blob/7c715c02c06f16475faff8aa72048cddb7382c8a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java#L210
This means "close with default close timeout of 30s". So in a way, it's not unexpected that it waits right?. We should ensure that's what we want from the console consumers. Ex. calling close(ZERO) would be the way if we want to "close and attempt sending all requests once without retrying/waiting"
Hi @lianetm, thanks for the review.
- So here the abort of the
FindCoordinatorwill only happen when the network thread closes, i.e after all the pending async commits/acknowledgements have completed. So this should not affect thefindCoordinatorrequest issued for an async commit right? https://github.com/apache/kafka/blob/7c715c02c06f16475faff8aa72048cddb7382c8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L887C9-L888C163 - I agree, we could go with
close(Duration.ZERO)too to achieve immediate close on ctrl-c. We just need to decide if we want this functionality or not, seems like an immediate close on ctrl-C would be nice from a user experience perspective. I am just wondering if we make theConsoleConsumer/ConsoleShareConsumerdo aclose(Duration.ZERO), then on certain cases where we actually need some time to send async commits/acknowledgements, we might force close the consumer sooner. This could be an issue right? The PR currently only modifies the code when the network thread is closed(i.e after all the commits/acks have been sent/handled), so this ideally should not affect the prior steps in closing.
Thanks for the PR, @ShivsundarR.
Since this change affects all consumer use cases, we need to tread carefully 😄
The console consumer is closing the underlying consumer with the default close() API, which uses a timeout of 30 seconds. That means the consumer is within its right to use up all 30 seconds to clean up. The change as is looks a little too broad because it assumes that because a node isn't immediately available that it should abort the request. This doesn't allow for the case where a node later becomes available within the timeout. There is a dependency with, e.g. OFFSET_COMMIT and FIND_COORDINATOR, so if the user is trying to commit their offsets, IMO we should try exhaustively to find the coordinator.
That said, the close() process is rife with edge cases and I'm sure there are things to fix.
Is there an approach by which we can "prove" that continuing to make requests is pointless?
Hi @kirktrue, thanks for the review. Yes I agree, the consumer should be able to use the 30 seconds if needed. I think the abort here only happens when the request need not be sent anymore. The abort happens
-
only when
onClosein theConsumerNetworkThreadis true, and -
onClosewill be true only when theConsumerNetworkThread::cleanupis called -
The cleanup of network thread happens right at the end of close() here after the completion of
commitSync(), and updating callbacks. -
And anyway we do intend to stop sending
findCoordinatorrequests before the network thread closes here, so ideally this should not be a problem. -
If there happened to be a
findCoordinatorrequest issued before thestopFindCoordinatorOnCloseevent was sent, that goes into a loop of retries when a node is unavailable. -
For
commitSync/acknowledgements(forShareConsumers) which occur duringclose, we do NOT abort thefindCoordinatoreven when the node is unavailable(asonCloseinNetworkClientDelegatewould be false), the respective request managers will handle the response when broker is unavailable and the process will complete as it was happening before this change. -
Once these stages complete, we reach the end when the network thread itself needs to close with the remaining time on the
closeTimeout. -
Now, if there are any
unsentrequests with no node to connect to, we try to abort such requests as anyway we have to close the network thread and issuing aFindCoordinator(even if a node was available) is no longer useful beyond this point.
This was my line of thought, does this sound good?
@lianetm @kirktrue The direction this PR is heading in looks good to me. Please could you take a deeper look and see whether you agree.
Thanks @kirktrue for the review, I have added a couple of integration tests to test both share-consumers and regular consumers.
the logic in this PR is assuming the UnsentRequest represents a FIND_COORDINATOR RPC because it has an empty Node.
Yes that's true in a way but I was thinking the u.node.isEmpty() check represents the general idea of not issuing requests to an unknown node when we are closing the network thread, in future if we modify this to have other APIs, the idea will still work.
But yes as of now both ways work, I was thinking to retain the current code to make it behaviour specific, what do you think?
I had another minor concern regarding the handling in CoordinatorRequestManager of the NetworkException that's passed to onFailure(). How does the CoordinatorRequestManager logic handle that error? For example, does this result in potentially misleading logging?
No, this does not lead to misleading logging, it does not enter the "Rediscovery" part in markCoordinatorUnknown as the coordinator is null.
Following are the debug logs I got when this the request is completed.
DEBUG [ShareConsumer clientId=consumer-group1-2, groupId=group1] Removing unsent request UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='group1', keyType=0, coordinatorKeys=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@32e301da, node=Optional.empty, remainingMs=29251} because the client is closing (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:246)
DEBUG [ShareConsumer clientId=consumer-group1-2, groupId=group1] FindCoordinator request failed due to retriable exception (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:205)
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
We just log that findCoordinator failed with a retriable exception and print the exception as well. So it should be fine I guess.