kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-7109: Close fetch sessions on close of consumer

Open divijvaidya opened this issue 2 years ago • 7 comments

Problem

When consumer is closed, fetch sessions associated with the consumer should notify the server about it's intention to close using a Fetch call with epoch = -1 (identified by FINAL_EPOCH in FetchMetadata.java). However, we are not sending this final fetch request in the current flow which leads to unnecessary fetch sessions on the server which are closed only after timeout.

Changes

  1. Change close() in Fetcher to add a logic to send the final Fetch request notifying close to the server.
  2. Change close() in Consumer to respect the timeout duration passed to it. Prior to this change, the timeout parameter was being ignored.
  3. Change tests to close with Duration.zero to reduce the execution time of the tests. Otherwise the tests will wait for default timeout to exit (close() in the tests is expected to be unsuccessful because there is no server to send the request to).
  4. Distinguish between the case of "close existing session and create new session" and "close existing session" by renaming the nextCloseExisting function to nextCloseExistingAttemptNew.

Testing

Added unit test which validates that the correct close request is sent to the server.

Note that this change has been attempted in https://github.com/apache/kafka/pull/5407 but the PR was abandoned.

divijvaidya avatar Sep 05 '22 13:09 divijvaidya

Test failures are unrelated. @showuon this is ready for your review.

Test failures:

[2022-09-06T16:49:33.427Z] org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults() failed, log available in /home/jenkins/workspace/Kafka_kafka-pr_PR-12590/metadata/build/reports/testOutput/org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults().test.stdout
[2022-09-06T16:49:33.427Z] 
[2022-09-06T16:49:33.427Z] QuorumControllerTest > testEarlyControllerResults() FAILED
[2022-09-06T16:49:33.427Z]     org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0
[2022-09-06T16:49:33.427Z]         at app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:740)

divijvaidya avatar Sep 07 '22 08:09 divijvaidya

@dajac since you are working on the consumer client protocol, perhaps you may also be interested in taking a look at this PR?

divijvaidya avatar Sep 08 '22 13:09 divijvaidya

Perhaps a naive question but does the fetch request to close the session fetches any records? Or does it just close the session and return?

dajac avatar Sep 20 '22 08:09 dajac

Perhaps a naive question but does the fetch request to close the session fetches any records? Or does it just close the session and return?

@dajac , good question. When consumer closing, it'll leave group first, and then close fetcher. I thought leaving group will clear the owned partition, but looks like it won't. Maybe we need to update in broker side, to not return records when client is trying to close the session and not create a new one. @divijvaidya , WDYT?

showuon avatar Sep 20 '22 09:09 showuon

does the fetch request to close the session fetches any records

No, because the fetch request's field for topic partitions is set to empty at sessionHandler.newBuilder().build() (line 1963 at Fetcher.java). Also, note that the empty fetch data in the close-fetch-request is asserted in the test at testFetcherCloseClosesFetchSessionsInBroker at assertTrue(builder.fetchData().isEmpty());

On the server side, the server handles a close fetch message by creating a SessionlessFetchContext which will return an empty response if FetchData is empty (see FetchSession.scala line 364)

Maybe we need to update in broker side, to not return records when client is trying to close the session and not create a new one

As explained above, both of these cases are already handled in the server by creation of a SessionlessFetchContext

@showuon @dajac Please let me know if I am missing anything here.

divijvaidya avatar Sep 20 '22 14:09 divijvaidya

@divijvaidya , you're right, thanks for the update! Sorry, I only checked the broker side implementation that although we created SessionlessFetchContext, it still return the fetched records. I missed the part that we sent the fetch request with "empty" fetchData. So, we're good! Thanks again for the explanation!

showuon avatar Sep 21 '22 01:09 showuon

The failing tests pass locally and are unrelated to this change.

Build / JDK 8 and Scala 2.12 / testReturnRecordsDuringRebalance() – org.apache.kafka.clients.consumer.KafkaConsumerTest
Build / JDK 17 and Scala 2.13 / testCreateTopicsReturnsConfigs(String).quorum=zk – kafka.api.PlaintextAdminIntegrationTest
Build / JDK 17 and Scala 2.13 / testCloseOldestConnection() – org.apache.kafka.common.network.Tls13SelectorTest

Note that testReturnRecordsDuringRebalance has been failing in other PRs too such as https://github.com/apache/kafka/pull/12457 which makes me believe that it is not due to this change.

divijvaidya avatar Sep 21 '22 10:09 divijvaidya

@dajac this is ready for your review, whenever you get a chance

divijvaidya avatar Sep 27 '22 16:09 divijvaidya

@dajac please take a look when you get a chance!

divijvaidya avatar Oct 10 '22 16:10 divijvaidya

@kirktrue @dajac please take a look when you get a chance!

divijvaidya avatar Oct 26 '22 09:10 divijvaidya

@dajac please take a look!

divijvaidya avatar Nov 17 '22 14:11 divijvaidya

@ijuma would you please take a look when you get a chance?

divijvaidya avatar Dec 05 '22 11:12 divijvaidya

@dajac @philipnee please review again (and restart the tests) when you get a chance! Thank you.

Unrelated test failures. UnitTest are successful in my local environment. Failing integration tests are known flaky tests.

Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
2m 0s
Build / JDK 11 and Scala 2.13 / testPatternSubscriptionWithTopicAndGroupRead(String).quorum=kraft – kafka.api.AuthorizerIntegrationTest
6s
Build / JDK 8 and Scala 2.12 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest

divijvaidya avatar Dec 27 '22 19:12 divijvaidya

Thanks @divijvaidya - I don't have more questions regarding this PR.

philipnee avatar Dec 27 '22 21:12 philipnee

@dajac , do you have any other comments?

showuon avatar Feb 08 '23 12:02 showuon

Thank you @dajac for your patience through this PR. It took a long time but it would definitely improve consumers! Cheers!

divijvaidya avatar Feb 10 '23 09:02 divijvaidya

@divijvaidya Sorry again for the long delay. By the way, I was wondering if we should also do this in the AbstractFetcherThread in order to close sessions used by replication when a broker shuts down. I haven't looked into it but that may be an interesting improvement as well.

dajac avatar Feb 10 '23 09:02 dajac

@divijvaidya Sorry again for the long delay. By the way, I was wondering if we should also do this in the AbstractFetcherThread in order to close sessions used by replication when a broker shuts down. I haven't looked into it but that may be an interesting improvement as well.

Thanks for the suggestion. I will look into it.

divijvaidya avatar Feb 10 '23 09:02 divijvaidya

https://issues.apache.org/jira/browse/KAFKA-15619

Deleted topics will come back again in Apache Spark structured streaming stress test after upgrade Kafka from 3.4.0 to 3.5.0, related ticket is: https://issues.apache.org/jira/browse/SPARK-45529 , the test randomly starts/stops/adds data/add partitions/delete topic/add topic/checks the result in a loop, I finally found that a deleted topic will come back again after some time.

By constantly reseting the head of branch-3.5 and using gradlew install to repackage and rerunning of the stress test, I am basically certain that this submission caused it.

Haven't go through the details of the PR, do you have any ideas @divijvaidya @dajac @showuon ?

dengziming avatar Oct 17 '23 02:10 dengziming

@dengziming , let's discuss it in the KAFKA-15619

showuon avatar Oct 17 '23 02:10 showuon