sarama icon indicating copy to clipboard operation
sarama copied to clipboard

feat(consumer): incremental cooperative balance strategy (KIP-429)

Open napallday opened this issue 1 year ago • 21 comments

Incremental Cooperative Protocol POC Proposal

I'm excited to present this Proof of Concept (POC) version that supports the incremental cooperative protocol as outlined in KIP-429. The proposed feature addresses a long-standing community demand (#1858). I've conducted several manual tests, and the results have been promising. (Can run examples for a try)

Major Changes

The existing design in Sarama tightly couples with the EAGER protocol, particularly evident in ConsumerGroupSession. In order to accommodate the new COOPERATIVE rebalance protocol, I've introduced the ConsumerGroupHandlerV2 interface and a new method called ConsumeV2.

Seeking Community Guidance

I'm eager to align this work with the community's future direction. Two possible paths are under consideration:

  1. Major Version Update Plan: If Sarama intends to increment the major version while introducing support for the COOPERATIVE rebalance protocol, it would allow us to streamline our implementation. We could eliminate ConsumerGroupSession-related coded from versions v2.x.x, focusing solely on the new approach.

  2. Maintaining Compatibility: Alternatively, if a major version update isn't planned, I'll take steps to enhance future maintainability. This involves refactoring common logic within the Consume and ConsumerV2 methods, paving the way for smoother maintenance.

Upcoming Work

In the pipeline, I plan to incorporate the following enhancements:

  • [ ] Integration of metrics for the new protocol.
  • [ ] Implementation of unit tests, functional tests, and benchmark tests to ensure reliability.
  • [x] Validation of Pause and Resume functionality within the new protocol.
  • [x] Verification for customized cooperative balance strategy implementation.

I'm looking forward to the community's insights and feedback as we work towards a more efficient and feature-rich Sarama.

napallday avatar Aug 20 '23 08:08 napallday

@napallday - I'm exited to see that cooperative incremental re-balancing is making its way into Sarama. I think we have apps that could benefit from it. I've had a quick play with the consumergroup_cooperative example and here's what I found:

  1. Multiple co-operative Sarama consumers in the same group work as expected. I tried starting / stopping individual consumers and each time other consumers in the group were revoked/assigned partitions appropriately without a complete re-assignment of all of the topic's partitions. Sometimes Setup(...) is invoked with an empty map of newly assigned partitions, I think corresponding to re-balances where the consumer's set of partitions is unchanged. This appears to be consistent with how the Java client behaves, so I'm not suggesting changing this behavior.
  2. Upgrading Sarama consumers from sticky -> co-operative sticky via rolling restarts worked as expected. E.g. all consumers started out "sticky", then were upgraded to "sticky, sticky-cooperative" , then finally upgraded to "sticky-cooperative" only.
  3. Co-existence with a Java partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor consumer in the same group has mixed results. If a Java consumer was leading the group and then a Sarama consumer joins - this works as expected. If a Sarama consumer was leading the group and a Java consumer joins then the Sarama leader panics:
     2023/08/21 20:49:59 Error from consumer: kafka: insufficient data to decode packet, more bytes expected
    panic: Error from consumer: kafka: insufficient data to decode packet, more bytes expected
    
     goroutine 11 [running]:
     log.Panicf({0x100e23868?, 0x100f5b3d8?}, {0x140000affa0?, 0x14000198af0?, 0x1?})
         /usr/local/go/src/log/log.go:395 +0x68
    main.main.func1()
        /tmp/sarama/examples/consumergroup_cooperative/main.go:119 +0x114
    created by main.main
        /tmp/sarama/examples/consumergroup_cooperative/main.go:110 +0x440
    
    I can grab more debug, if it would be helpful. (This was using Kafka 3.3.1, a Kafka 3.1.2 client, and the consumergroups_cooperative example).

Regarding your Seeking Community Guidance question. IMO it would be great to add cooperative re-balancing support as soon as practical - without necessarily waiting for Sarama v2. For example, Sarama v1 could be extended with the ConsumerV2 method, which would then be re-factored into a more elegant single consumer approach in Sarama v2.

prestona avatar Aug 21 '23 20:08 prestona

@napallday thanks for putting together this PR!

RE: 3. mentioned by @prestona it looks like a bug in our decoding of the MemberMetadata in the JoinGroups response where we return early if OwnedPartitions is an empty array, so we haven't consumed the whole buffer. I'll fix that up in a separate PR

I think you also need to cope with UserData being empty in the balance strategy to avoid a decoding failure:

diff --git a/balance_strategy.go b/balance_strategy.go
index cc7a0a3..eab26b2 100644
--- a/balance_strategy.go
+++ b/balance_strategy.go
@@ -967,6 +967,9 @@ func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadat
 	// for each partition we create a sorted map of its consumers by generation
 	sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
 	for memberID, meta := range members {
+		if len(meta.UserData) == 0 {
+			continue
+		}
 		consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
 		if err != nil {
 			return nil, nil, err

I'll add this change as well after rebasing your PR on #2618

dnwe avatar Aug 21 '23 21:08 dnwe

That seemed to work well for me. I ran sarama, rdkafka and (Java) kafka-console-consumer in the same cooperative balance group and ran some chaos testing restarting each of them randomly and each member was able to join/rejoin/lead without causing issues for the others

In the middle of lots of runs I did trigger one protocol decode failure in deserializeTopicPartitionAssignment where it seemed like UserData for one member was 4 bytes (0xff,0xff,0xff,0xff) but it's not clear what caused that. We may just wish to guard the deserializeTopicPartitionAssignment call by ignoring "InsufficientData" err and continuing in the loop

dnwe avatar Aug 21 '23 22:08 dnwe

Big thanks to @prestona and @dnwe for the thorough testing and fix! 😄

In the middle of lots of runs I did trigger one protocol decode failure in deserializeTopicPartitionAssignment where it seemed like UserData for one member was 4 bytes (0xff,0xff,0xff,0xff) but it's not clear what caused that. We may just wish to guard the deserializeTopicPartitionAssignment call by ignoring "InsufficientData" err and continuing in the loop

@dnwe Regarding this issue, could you help provide information about the combination of Kafka clients and the version of Kafka server used during testing? I personally conducted tests using Sarama, without direct interaction with rdkafka or Java clients. So far, I have not come across this particular issue.

napallday avatar Aug 22 '23 04:08 napallday

@napallday no worries, so I set KAFKA_VERSION=3.3.2 and started up docker-compose as used in the FV with the toxiproxy routes configured and created a six partition topic called cooperative-topic. I setup a sarama producer to continuously write some records to the topic.

Then in three different terminals I ran:

# sarama
./consumergroup_cooperative -version 3.3.2 -brokers 127.0.0.1:29091 -group sarama-coop -topics cooperative-topic

# librdkafka
./kcat -b 127.0.0.1:29091 -G sarama-coop -X partition.assignment.strategy=cooperative-sticky cooperative-topic

# java
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29091 --topic cooperative-topic --group sarama-coop --consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

And stopped and started each of them at various points so they all had a turn leading and joining the group

dnwe avatar Aug 22 '23 09:08 dnwe

The panic will be reproduced following these steps:

Step 1. Run the Sarama consumer (Leader). Step 2. Run the Java consumer.

This is due to the following improvements of cooperative-sticky strategy implementations within Java. Specifically, in userData, the first 4 bytes represent the generationID (where 4 bytes of 0xff actually translate to -1). This can be observed in the code changes here.

I will address this issue at a later time.

napallday avatar Aug 22 '23 14:08 napallday

Hi @prestona @dnwe,

FYI, I've submitted a commit to fix the issue mentioned above. 🛠️

After extensive testing with different Kafka clients, including librdkafka (v3.3.2), java client (v3.5.0 & v3.3.2), and sarama (v3.3.2), I believe the panic issues have been successfully resolved.

However, during testing with java client (v2.8.1), I observed an occurrence of java.lang.IllegalStateException in this client when the cooperative assignment was being validated by it. It's worth noting that this behavior appears to be specific to the older versions of the Java client. Since this java client is the leader who did the assignments, I believe that the underlying cause might be rooted in these outdated Java client versions rather than sarama.

napallday avatar Aug 23 '23 17:08 napallday

Interesting. It took me a few attempts, but yes I also managed to trigger that IllegalStateException with a Java 2.8.1 consumer:

[2023-08-23 21:03:16,182] ERROR [Consumer clientId=consumer-sarama-coop-1, groupId=sarama-coop] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [cooperative-topic-2] which are still owned by some members (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2023-08-23 21:03:16,189] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: Assignor supporting the COOPERATIVE protocol violates its requirements
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.validateCooperativeAssignment(ConsumerCoordinator.java:668)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:592)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
        at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:443)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:102)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

dnwe avatar Aug 23 '23 21:08 dnwe

@napallday OK I looked over apache/kafka and I see that they skipped this validation code in the kafka-client under KAFKA-13406 and PR #11439 due to issues with the verification and that was the Java client behaviour from 2.8.2 onward

dnwe avatar Aug 23 '23 23:08 dnwe

Cool! 🎉 Huge thanks to @dnwe for the invetigation.

The explanations provided in the Jira ticket are crystal clear.

By the way, just like this fix in apache/kafka, the official Java clients have some improvements in sticky and cooperative-sticky assignors since the initial introduction of the incremental cooperative balance protocol. In the future, we can integrate these enhancements gradually.

napallday avatar Aug 24 '23 02:08 napallday

@napallday were you planning to do the follow up work around metrics, pause/resume and additional FV tests under this PR?

I suppose the alternative would be to merge this as-is, but report the feature as experimental in the changelog so people can test it out in their staging environments

dnwe avatar Oct 02 '23 19:10 dnwe

@dnwe Thanks for the reminder.

For the pause/resume feature, I've verified that the behaviour is consistent with Java Kafka.

  • For EAGLE protocol: all paused status will be dropped when rebalance
  • For COOPERATIVE protocol: paused status of revoked partitions will be dropped when rebalance

Regarding the metrics and FV tests, I may add it at a later time due to some personal matters. Therefore, I concur with releasing it as an experimental feature initially.

napallday avatar Oct 09 '23 10:10 napallday

Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.

github-actions[bot] avatar Jan 25 '24 20:01 github-actions[bot]

Hi @napallday @dnwe, it seems there is no major concern about this PR. Do you think this can be merged soon?

It is indeed a useful feature I'd like to try out. But it's better to use main branch (or better a release) than a pull request branch.

wwwjfy avatar Jan 26 '24 09:01 wwwjfy

@wwwjfy 👋🏻 thanks for getting in touch

The issue with merging to main at the moment is that the PR branch introduces new interfaces of ConsumeV2 and ConsumerGroupHandlerV2, which we'd then be locked into supporting and ideally we want to be exposing cooperative balance strategy as just a config change for users, rather than requiring them to rewrite their app code to make use of it.

Similarly, if we merge to main without more FV coverage, it might not be obvious to users (unless they read notes on GitHub Releases) that it is considered experimental.

Would it help your testing if I pull the commits into a branch on the main repository rather than them being only on a fork? i.e.,., if I make a cooperative-balance-strategy branch with these commits then you can go get github.com/IBM/sarama@cooperative-balance-strategy, start calling the new interfaces and experiment with the code and provide feedback?

dnwe avatar Jan 26 '24 09:01 dnwe

I hope the current ConsumerGroupHandler can still use the cooperative balance stragtegy. If user want to receive the revoke and assign partition, they can provide their own OnAssignPartition and OnRevokePartition method. and the default handler will be provide if user use origin ConsumerGroupHandler without change current code.

lifepuzzlefun avatar Jan 26 '24 10:01 lifepuzzlefun

Thanks for the quick response :)

It does make sense, if possible, we maintain a single interface than two. (Sorry I didn't read the change and the comments in this PR)

I'm not sure what's the way forward. Using single interfaces will probably break backward compatibility I guess, which means we'll wait until a major version upgrade as well as the consolidation work before that.

For testing, to me alone, I have my forked version anyway, though for others, what you mentioned may be useful :D

Still evaluating using sarama or confluent sdk which already has this feature. will have feedback if I get anything useful

wwwjfy avatar Jan 26 '24 10:01 wwwjfy

Yes I think the ideal scenario would be if we can rework this PR to keep the Sarama consumer interface consistent, but expose an opt-in similarly to the Java client via an equivalent to the partition.assignment.strategy config variable

dnwe avatar Jan 26 '24 10:01 dnwe

I feel it's hard to keep using the ConsumerGroupHandler interface since it's highly coupled with eager rebalance protocol - e.g. ConsumerGroupSession as the function parameter, is generated in every new generation.

As the new implementation ConsumerGroupHandlerV2 also works for the previous one ConsumerGroupHandler, may I know if it's possible to have another major version of Sarama in the future to include this PR?

I can help add fv tests later but it seems to be a huge work...

napallday avatar Jan 26 '24 11:01 napallday

if we want to keep ConsumerGroupHandler, i think the possible way is to :

  • call Setup(ConsumerGroupSession) / Cleanup(ConsumerGroupSession) when new assignment receives / a new rebalance tirggered.
  • change ConsumerGroupSession's Claims/MemberId/GenerationId functions to be thread safe.

so if user don't care about assignment change in their previous code(or they only do some log print), only enable cooperative in config will be enough. But user will still need to be aware of the API behaviour change when enable cooperative rebalance.

aiquestion avatar Feb 27 '24 09:02 aiquestion

any ETA for this feature ?

dberardo-com avatar Aug 20 '24 16:08 dberardo-com