kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-16985: Ensure consumer sends leave request on close even if interrupted

Open kirktrue opened this issue 7 months ago • 42 comments

The consumer should attempt to leave the group cleanly upon close(), regardless of a) the timeout, b) interrupts.

If the user interrupted the current thread, upon close(), note the interruption, clear the flag, proceed with the close logic, and then throw an InterruptException at the end of close().

After the UnsubscribeEvent completes, there are still more steps to be performed to leave the group:

  1. The application thread processes the ConsumerRebalanceListenerCallbackNeededEvent that was enqueued by the background thread shortly before completing the UnsubscribeEvent
  2. The ConsumerRebalanceListener.onPartitionsRevoked() callback is invoked
  3. The application thread enqueues a ConsumerRebalanceListenerCallbackCompletedEvent for the background thread
  4. The background thread will be notified about the callback's completion, so that it can send out the "leave group" heartbeat

If the user invokes close() with a low timeout, we need to ensure the above steps are performed even if the UnsubscribeEvent itself timed out.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

kirktrue avatar Jul 24 '24 23:07 kirktrue