kafka
kafka copied to clipboard
KAFKA-15615: Improve handling of fetching during metadata updates
Problem
As stated in the KAFKA-15615 When a fetch response receives an error about partition leadership, fencing, etc. a metadata refresh is triggered. Until its metadata is updated, the consumer will continue to attempt fetching data for the partition, resulting in failure responses due to outdated information.
Approach
In cases where a partition requires metadata updates due to Fetch response, a temporary "Pause" approach is applied.
- In this situation, the partition transitions to the
AWAIT_UPDATE
(ADD) state. - The
AWAIT_UPDATE
state is an Unfetchable state (TopicPartitionState.isFetchable()
is false). - In other words, the partition is excluded from the Fetch targets in
AbstractFetch.fetchablePartitions
. - For KIP-951, if the fetch error response includes CurrentLeader information, the subscription remains fetchable as is.
AWAIT_UPDATE
- Transitions from
-
FETCHING
-
- Transitions to
-
FETCHING
-
AWAIT_RESET
-
AWAIT_VALIDATION
-
AWAIT_UPDATE
-
- Even if the metadata is updated without changes to Leader and Epoch information in the
AWAIT_UPDATE
state, the. partition transitions to the next state bySubscriptionState.maybeValidatePositionForCurrentLeader
.-
FETCHING
-
AWAIT_VALIDATION
-
-
requiresPosition
true -
hasValidPosition
false
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)