KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning
Context
Treats KAFKA-16277 - CooperativeStickyAssignor does not spread topics evenly among consumer group
https://issues.apache.org/jira/browse/KAFKA-16277
@ableegoldman :
I suspect the assignor could be making a better effort. Presumably what is happening is that during the phase where it attempts to re-assign previously-owned partitions back to their former owner, we make a pass over a sorted list of previously-owned partitions that is grouped by topic. The assignor will then assign partitions from this list one-by-one to its previous owner until it hits the expected total number of partitions. So in the scenario you describe, it's basically looping over (t1p0, t1p1, t1p2, t1p3...t1pN, t2p0, t2p1, t2p2...t2pN) and assigning the first N partitions to the first consumer, which would be everything from topic 1, then just dumping the remaining partitions – all of which belong to topic 2 – onto the new consumer. The fix should be fairly simple – we just need to group this sorted list by partition, rather than by topic (ie t1p0, t2p0, t1p1, t2p1...t1pN, t2pN).
Test strategy:
- Update tests with new assignments. Looking at the new assignments, the topics are now balanced between consumers, ie when there are multiple topics, one consumer no longer holds all partitions for
topic1, they are spread across multiple consumers. - Create new test that verifies consumers have an equal number of partitions from each topic as more consumers are added and removed
Changes
- AbstractStickyAssignor - Sort owned TopicPartitions by partition when reassigning
- Update tests
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Thanks for the review @ableegoldman - I have incorporated your suggested changes
I was going to try to fix the pipeline issue:
[Checks API] No suitable checks publisher found.
But then I saw it had been attempted and reverted 😭 https://github.com/apache/kafka/pull/15292
Test failures are unrelated, merged to trunk.
Thanks for the fix!
FYI I cherrypicked this back to 3.7 while cherrypicking back another sticky assignor fix. Should be in the 3.7.1 release