franz-go icon indicating copy to clipboard operation
franz-go copied to clipboard

Consumer group rebalancing bug when switching from eager to cooperative consumers

Open hamdanjaveed opened this issue 3 months ago • 8 comments

Hello 👋!

We've been using franz-go for a while and have our consumers currently configured with the RangeBalancer. We want to switch to the CooperativeStickyBalancer and were trying to follow the instructions listed in both KIP-429 and the franz-go docs for CooperativeStickyBalancer which state that we essentially need to perform a double bounce to upgrade. However we noticed that during the first bounce where we add cooperative-sticky to our set of balancers, when our old range consumers would leave the consumer group we kept getting consumption lag on some of the partitions.

There's a minimal reproduction in this repo here.

In the reproduction we:

  • Spin up kafka and a producer that publishes events
  • Spin up a consumer that uses the [RangeBalancer] balancer
  • Spin up a consumer that uses the [CooperativeStickyBalancer, RangeBalancer] balancers
  • Shutdown the [RangeBalancer] consumer
  • The remaining consumer seems to revoke half of its partitions but never re-assigns them to itself

We also see that the issue seems to persist if we then spin up a new consumer that uses the [CooperativeStickyBalancer, RangeBalancer] balancers, some partitions are seemingly never re-assigned and are stuck. We found that performing a full restart of all the consumers in the consumer group once we're in this state fixes the issue (i.e. unsticks the partitions).

Let me know if we're missing something here!

hamdanjaveed avatar Mar 06 '24 18:03 hamdanjaveed

Got the same issue on 1.14.0, 1.15.0 Tested with franz coop consumers + sarama and franz coop consumers + franz-go consumers with RR/Range balancers. Same effect. My steps was:

  1. Create topic with 12 partitions
  2. Start 12 eager consumers(sarama/franz-go). Wait a few seconds -> First rebalance
  3. Start 12 cooperative consumers(franz-go) -> Second rebalance
  4. Shutdown eager consumers - Third rebalance
  5. Consumers of partitions that were assigned to a CoopSticky clients on step 2 (before shutdown of Eager consumers) stucks right after third rebalance.

It becomes clear when each group of consumers uses its own ClientID as all consumers in Range assignment are sorted by MemberID before assignment. So the problem is stable to be reproduced if cooperative consumers MemberID(that is ClientID+suffix) appears earlier than eager ones after sorting. For example lets say coop-consumers has ClientID = "a-consumer" and eager-consumers has ClientID = "z-consumer". This configuration stucks all the partitions after last rebalance in my example. In the opposite case(coop client id = z-consumer , eager client id = a-consumer) all the consumers works properly after last rebalance.

Using same ClientID for all consumers leads to randomness.

Also looks like kafka itself thinks that all partitions were assigned after the last rebalance. As mentioned above, full restart helps. Rolling restart also helps (start new consumers and then shutdown old) - looks like its because of leader change.

YellowCataclysm avatar Mar 07 '24 06:03 YellowCataclysm

I'll probably be able to look at this on Friday.

twmb avatar Mar 13 '24 05:03 twmb

Was looking into this a bit and what I think is happening is:

  • Say we have a consumer c0 with [RangeBalancer] balancers in a consumer group consuming from a topic t0 with partitions [t0p0, t0p1]
  • A new consumer c1 joins the consumer group with the [CooperativeStickyBalancer, RangeBalancer] balancers, say the assignment is now c0: [t0p0], c1: [t0p1]
  • Consumer c0 leaves the consumer group
  • This causes consumer c1 to eagerly revoke its prior assigned partitions (which would be t0p1) by nil-ing out nowAssigned
  • However lastAssigned remains populated with the old assignment (t0p1)
  • When that cooperative consumer now continues its consumer group rebalance it uses its lastAssigned=[t0p1] as its current assignment
  • This gets sent as part of the JoinGroupMetadata as the currentAssignments which gets set as the OwnedPartitions for consumer c1

And I think that leaves us in a situation where c1 has revoked its previously owned partitions (t0p1) but performs the next rebalance thinking it still owns them and only adds the partitions it thinks it doesn't own (t0p0).

I tried nil-ing out lastAssigned in groupConsumer::revoke which seems to fix the locally reproducible issue from my repo but I have no confidence that that's correct (and if I had to guess, it's probably not).

I'll keep looking and trying to understand what's happening here 😄

hamdanjaveed avatar Mar 23 '24 08:03 hamdanjaveed

The fix is accurate, and the diagnosis is almost correct. The final step -- set as OwnedPartitions -- is a red herring. OwnedPartitions is used by the sticky balancer to guard against zombies (I'd have to read the code more to refresh truly what this guards).

The bug is right here: https://github.com/twmb/franz-go/blob/351e7fae879ce6bf712722f227893f41b66738bc/pkg/kgo/consumer_group.go#L588-L590

It's ok to nil out lastAssigned, because it's truly meant for tracking state between rebalances for cooperative group balancers specifically. It's not important to keep around the prior state for an eager balancer because well, for an eager balancer, there isn't meant to be any prior state at the start of every group session.

(also sorry for the delay in looking into this)

twmb avatar Mar 26 '24 02:03 twmb

@twmb was thinking of writing up a PR to include the fix along with a test, would that be helpful? I'm wondering the best way to go about writing the test and was thinking of doing something similar to what testChainETL() does in helpers_test.go but instead spinning up and shutting down consumers that have different GroupBalancers. Tried to re-use the existing testConsumer but it felt a bit clunky in the context of this test and wasn't working how I'd expect it to work. Would you have any thoughts on how to best approach writing a test for this?

hamdanjaveed avatar Apr 22 '24 18:04 hamdanjaveed

It's helpful. I don't think I fixed this in a branch locally. My own holdup on fixing this myself is that one of the KIPs is harder to implement than I thought. I spent some time on a plane implementing the fix and the more I worked on it, the bigger the scope turned out to be. I've set it aside and have been prioritizing some of my own stuff for a bit lately, so work has been essentially frozen. I aim to get back to this stuff sooner than later; if you go ahead and implement the fix and test before I get to it, I'll appreciate it -- no timeline on merging and releasing yet (though if I take too too long, I'll just go ahead and do a bugfix release).

twmb avatar Apr 24 '24 19:04 twmb

Awesome, will give it a go 👍

I spent some time on a plane implementing the fix and the more I worked on it, the bigger the scope turned out to be

Are you referring to the fix for this issue or the KIP you were working on?

If it's the fix then I assume that means there's more to it than simply nil-ing out lastAssigned

hamdanjaveed avatar Apr 24 '24 19:04 hamdanjaveed

I'm referring to "KIP-951 - Leader discovery optimisations for the client". The client isn't implemented in a way to "move" partitions internally outside of the metadata loop, so hooking into this properly has been a PITA.

For this bug, niling out lastAssigned is all that's necessary, with the reasoning I above in that comment (i.e. not due to OwnedPartitions, but due to a different reason).

twmb avatar Apr 24 '24 19:04 twmb