kafka
kafka copied to clipboard
KAFKA-16985: Ensure consumer sends leave request on close even if interrupted
The consumer should attempt to leave the group cleanly upon close()
, regardless of a) the timeout, b) interrupts.
If the user interrupted the current thread, upon close()
, note the interruption, clear the flag, proceed with the close logic, and then throw an InterruptException
at the end of close()
.
After the UnsubscribeEvent
completes, there are still more steps to be performed to leave the group:
- The application thread processes the
ConsumerRebalanceListenerCallbackNeededEvent
that was enqueued by the background thread shortly before completing theUnsubscribeEvent
- The
ConsumerRebalanceListener.onPartitionsRevoked()
callback is invoked - The application thread enqueues a
ConsumerRebalanceListenerCallbackCompletedEvent
for the background thread - The background thread will be notified about the callback's completion, so that it can send out the "leave group" heartbeat
If the user invokes close()
with a low timeout, we need to ensure the above steps are performed even if the UnsubscribeEvent
itself timed out.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)