kafka
kafka copied to clipboard
KAFKA-7109: Close fetch sessions on close of consumer
Problem
When consumer is closed, fetch sessions associated with the consumer should notify the server about it's intention to close using a Fetch call with epoch = -1 (identified by FINAL_EPOCH
in FetchMetadata.java
). However, we are not sending this final fetch request in the current flow which leads to unnecessary fetch sessions on the server which are closed only after timeout.
Changes
- Change
close()
inFetcher
to add a logic to send the final Fetch request notifying close to the server. - Change
close()
inConsumer
to respect the timeout duration passed to it. Prior to this change, the timeout parameter was being ignored. - Change tests to close with
Duration.zero
to reduce the execution time of the tests. Otherwise the tests will wait for default timeout to exit (close() in the tests is expected to be unsuccessful because there is no server to send the request to). - Distinguish between the case of "close existing session and create new session" and "close existing session" by renaming the
nextCloseExisting
function tonextCloseExistingAttemptNew
.
Testing
Added unit test which validates that the correct close request is sent to the server.
Note that this change has been attempted in https://github.com/apache/kafka/pull/5407 but the PR was abandoned.
Test failures are unrelated. @showuon this is ready for your review.
Test failures:
[2022-09-06T16:49:33.427Z] org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults() failed, log available in /home/jenkins/workspace/Kafka_kafka-pr_PR-12590/metadata/build/reports/testOutput/org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults().test.stdout
[2022-09-06T16:49:33.427Z]
[2022-09-06T16:49:33.427Z] QuorumControllerTest > testEarlyControllerResults() FAILED
[2022-09-06T16:49:33.427Z] org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0
[2022-09-06T16:49:33.427Z] at app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:740)
@dajac since you are working on the consumer client protocol, perhaps you may also be interested in taking a look at this PR?
Perhaps a naive question but does the fetch request to close the session fetches any records? Or does it just close the session and return?
Perhaps a naive question but does the fetch request to close the session fetches any records? Or does it just close the session and return?
@dajac , good question. When consumer closing, it'll leave group first, and then close fetcher. I thought leaving group will clear the owned partition, but looks like it won't. Maybe we need to update in broker side, to not return records when client is trying to close the session and not create a new one. @divijvaidya , WDYT?
does the fetch request to close the session fetches any records
No, because the fetch request's field for topic partitions is set to empty at sessionHandler.newBuilder().build()
(line 1963 at Fetcher.java). Also, note that the empty fetch data in the close-fetch-request is asserted in the test at testFetcherCloseClosesFetchSessionsInBroker
at assertTrue(builder.fetchData().isEmpty());
On the server side, the server handles a close fetch message by creating a SessionlessFetchContext
which will return an empty response if FetchData is empty (see FetchSession.scala line 364)
Maybe we need to update in broker side, to not return records when client is trying to close the session and not create a new one
As explained above, both of these cases are already handled in the server by creation of a SessionlessFetchContext
@showuon @dajac Please let me know if I am missing anything here.
@divijvaidya , you're right, thanks for the update! Sorry, I only checked the broker side implementation that although we created SessionlessFetchContext
, it still return the fetched records. I missed the part that we sent the fetch request with "empty" fetchData. So, we're good! Thanks again for the explanation!
The failing tests pass locally and are unrelated to this change.
Build / JDK 8 and Scala 2.12 / testReturnRecordsDuringRebalance() – org.apache.kafka.clients.consumer.KafkaConsumerTest
Build / JDK 17 and Scala 2.13 / testCreateTopicsReturnsConfigs(String).quorum=zk – kafka.api.PlaintextAdminIntegrationTest
Build / JDK 17 and Scala 2.13 / testCloseOldestConnection() – org.apache.kafka.common.network.Tls13SelectorTest
Note that testReturnRecordsDuringRebalance
has been failing in other PRs too such as https://github.com/apache/kafka/pull/12457 which makes me believe that it is not due to this change.
@dajac this is ready for your review, whenever you get a chance
@dajac please take a look when you get a chance!
@kirktrue @dajac please take a look when you get a chance!
@dajac please take a look!
@ijuma would you please take a look when you get a chance?
@dajac @philipnee please review again (and restart the tests) when you get a chance! Thank you.
Unrelated test failures. UnitTest are successful in my local environment. Failing integration tests are known flaky tests.
Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
2m 0s
Build / JDK 11 and Scala 2.13 / testPatternSubscriptionWithTopicAndGroupRead(String).quorum=kraft – kafka.api.AuthorizerIntegrationTest
6s
Build / JDK 8 and Scala 2.12 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest
Thanks @divijvaidya - I don't have more questions regarding this PR.
@dajac , do you have any other comments?
Thank you @dajac for your patience through this PR. It took a long time but it would definitely improve consumers! Cheers!
@divijvaidya Sorry again for the long delay. By the way, I was wondering if we should also do this in the AbstractFetcherThread in order to close sessions used by replication when a broker shuts down. I haven't looked into it but that may be an interesting improvement as well.
@divijvaidya Sorry again for the long delay. By the way, I was wondering if we should also do this in the AbstractFetcherThread in order to close sessions used by replication when a broker shuts down. I haven't looked into it but that may be an interesting improvement as well.
Thanks for the suggestion. I will look into it.
https://issues.apache.org/jira/browse/KAFKA-15619
Deleted topics will come back again in Apache Spark structured streaming stress test after upgrade Kafka from 3.4.0 to 3.5.0, related ticket is: https://issues.apache.org/jira/browse/SPARK-45529 , the test randomly starts/stops/adds data/add partitions/delete topic/add topic/checks the result in a loop, I finally found that a deleted topic will come back again after some time.
By constantly reseting the head of branch-3.5 and using gradlew install
to repackage and rerunning of the stress test, I am basically certain that this submission caused it.
Haven't go through the details of the PR, do you have any ideas @divijvaidya @dajac @showuon ?
@dengziming , let's discuss it in the KAFKA-15619