Remove duplicates from topics
This PR is intended to fix the issues observed in https://github.com/Shopify/sarama/issues/2264.
Summary of the issue: Passing duplicate topics into the Consume function leads to uneven partition division between different consumers of the consumer group.
References: Java client handles this issue by always discarding duplicates - as it collapses the collection of topics into a set.
@dnwe Could you please review the issue and PR.
Issue could be tested by adding this testcase to balance_strategy_test.go:
{ name: "2 members, 1 topic, 8 partitions each", members: map[string][]string{"M1": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}, "M2": {"T1", "T1", "T1", "T1", "T1", "T1", "T1", "T1"}}, topics: map[string][]int32{"T1": {0, 1, 2, 3, 4, 5, 6, 7}}, expected: BalanceStrategyPlan{ "M1": map[string][]int32{"T1": {0, 1, 2, 3}}, "M2": map[string][]int32{"T1": {4, 5, 6, 7}}, }, },
This fails with:
=== RUN TestBalanceStrategyRange === RUN TestBalanceStrategyRange/2_members,_1_topic,_8_partitions_each#01 balance_strategy_test.go:101: Plan does not match expectation expected: sarama.BalanceStrategyPlan{"M1":map[string][]int32{"T1":[]int32{0, 1, 2, 3}}, "M2":map[string][]int32{"T1":[]int32{4, 5, 6, 7}}} actual: sarama.BalanceStrategyPlan{"M1":map[string][]int32{"T1":[]int32{0, 1, 2, 3, 4, 5, 6, 7}}} --- FAIL: TestBalanceStrategyRange (0.00s) --- FAIL: TestBalanceStrategyRange/2_members,_1_topic,_8_partitions_each#01 (0.00s)
@Smriti-OSS ah thanks — yes so the issue is that the consumer has duplicate topics in the consumer group metadata sent as part of the JoinGroupRequest and used for generating the assignments
I've pushed up https://github.com/Shopify/sarama/pull/2285 with the sample unittest and to cover de-duplication in the balance_strategy — as I think it's worth doing it there as well as in the consumer group itself
Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.