sarama
sarama copied to clipboard
feat(consumer): incremental cooperative balance strategy (KIP-429)
Incremental Cooperative Protocol POC Proposal
I'm excited to present this Proof of Concept (POC) version that supports the incremental cooperative protocol as outlined in KIP-429. The proposed feature addresses a long-standing community demand (#1858). I've conducted several manual tests, and the results have been promising. (Can run examples for a try)
Major Changes
The existing design in Sarama tightly couples with the EAGER protocol, particularly evident in ConsumerGroupSession
. In order to accommodate the new COOPERATIVE
rebalance protocol, I've introduced the ConsumerGroupHandlerV2
interface and a new method called ConsumeV2
.
Seeking Community Guidance
I'm eager to align this work with the community's future direction. Two possible paths are under consideration:
-
Major Version Update Plan: If Sarama intends to increment the major version while introducing support for the
COOPERATIVE
rebalance protocol, it would allow us to streamline our implementation. We could eliminateConsumerGroupSession
-related coded from versionsv2.x.x
, focusing solely on the new approach. -
Maintaining Compatibility: Alternatively, if a major version update isn't planned, I'll take steps to enhance future maintainability. This involves refactoring common logic within the
Consume
andConsumerV2
methods, paving the way for smoother maintenance.
Upcoming Work
In the pipeline, I plan to incorporate the following enhancements:
- [ ] Integration of metrics for the new protocol.
- [ ] Implementation of unit tests, functional tests, and benchmark tests to ensure reliability.
- [x] Validation of
Pause
andResume
functionality within the new protocol. - [x] Verification for customized cooperative balance strategy implementation.
I'm looking forward to the community's insights and feedback as we work towards a more efficient and feature-rich Sarama.
@napallday - I'm exited to see that cooperative incremental re-balancing is making its way into Sarama. I think we have apps that could benefit from it. I've had a quick play with the consumergroup_cooperative example and here's what I found:
- Multiple co-operative Sarama consumers in the same group work as expected. I tried starting / stopping individual consumers and each time other consumers in the group were revoked/assigned partitions appropriately without a complete re-assignment of all of the topic's partitions. Sometimes
Setup(...)
is invoked with an empty map of newly assigned partitions, I think corresponding to re-balances where the consumer's set of partitions is unchanged. This appears to be consistent with how the Java client behaves, so I'm not suggesting changing this behavior. - Upgrading Sarama consumers from sticky -> co-operative sticky via rolling restarts worked as expected. E.g. all consumers started out "sticky", then were upgraded to "sticky, sticky-cooperative" , then finally upgraded to "sticky-cooperative" only.
- Co-existence with a Java
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
consumer in the same group has mixed results. If a Java consumer was leading the group and then a Sarama consumer joins - this works as expected. If a Sarama consumer was leading the group and a Java consumer joins then the Sarama leader panics:
I can grab more debug, if it would be helpful. (This was using Kafka 3.3.1, a Kafka 3.1.2 client, and the consumergroups_cooperative example).2023/08/21 20:49:59 Error from consumer: kafka: insufficient data to decode packet, more bytes expected panic: Error from consumer: kafka: insufficient data to decode packet, more bytes expected goroutine 11 [running]: log.Panicf({0x100e23868?, 0x100f5b3d8?}, {0x140000affa0?, 0x14000198af0?, 0x1?}) /usr/local/go/src/log/log.go:395 +0x68 main.main.func1() /tmp/sarama/examples/consumergroup_cooperative/main.go:119 +0x114 created by main.main /tmp/sarama/examples/consumergroup_cooperative/main.go:110 +0x440
Regarding your Seeking Community Guidance question. IMO it would be great to add cooperative re-balancing support as soon as practical - without necessarily waiting for Sarama v2. For example, Sarama v1 could be extended with the ConsumerV2
method, which would then be re-factored into a more elegant single consumer approach in Sarama v2.
@napallday thanks for putting together this PR!
RE: 3. mentioned by @prestona it looks like a bug in our decoding of the MemberMetadata in the JoinGroups response where we return early if OwnedPartitions is an empty array, so we haven't consumed the whole buffer. I'll fix that up in a separate PR
I think you also need to cope with UserData being empty in the balance strategy to avoid a decoding failure:
diff --git a/balance_strategy.go b/balance_strategy.go
index cc7a0a3..eab26b2 100644
--- a/balance_strategy.go
+++ b/balance_strategy.go
@@ -967,6 +967,9 @@ func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadat
// for each partition we create a sorted map of its consumers by generation
sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
for memberID, meta := range members {
+ if len(meta.UserData) == 0 {
+ continue
+ }
consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
if err != nil {
return nil, nil, err
I'll add this change as well after rebasing your PR on #2618
That seemed to work well for me. I ran sarama, rdkafka and (Java) kafka-console-consumer in the same cooperative balance group and ran some chaos testing restarting each of them randomly and each member was able to join/rejoin/lead without causing issues for the others
In the middle of lots of runs I did trigger one protocol decode failure in deserializeTopicPartitionAssignment
where it seemed like UserData for one member was 4 bytes (0xff,0xff,0xff,0xff) but it's not clear what caused that. We may just wish to guard the deserializeTopicPartitionAssignment call by ignoring "InsufficientData" err and continuing in the loop
Big thanks to @prestona and @dnwe for the thorough testing and fix! 😄
In the middle of lots of runs I did trigger one protocol decode failure in deserializeTopicPartitionAssignment where it seemed like UserData for one member was 4 bytes (0xff,0xff,0xff,0xff) but it's not clear what caused that. We may just wish to guard the deserializeTopicPartitionAssignment call by ignoring "InsufficientData" err and continuing in the loop
@dnwe Regarding this issue, could you help provide information about the combination of Kafka clients and the version of Kafka server used during testing? I personally conducted tests using Sarama, without direct interaction with rdkafka
or Java clients
. So far, I have not come across this particular issue.
@napallday no worries, so I set KAFKA_VERSION=3.3.2 and started up docker-compose as used in the FV with the toxiproxy routes configured and created a six partition topic called cooperative-topic
. I setup a sarama producer to continuously write some records to the topic.
Then in three different terminals I ran:
# sarama
./consumergroup_cooperative -version 3.3.2 -brokers 127.0.0.1:29091 -group sarama-coop -topics cooperative-topic
# librdkafka
./kcat -b 127.0.0.1:29091 -G sarama-coop -X partition.assignment.strategy=cooperative-sticky cooperative-topic
# java
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:29091 --topic cooperative-topic --group sarama-coop --consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
And stopped and started each of them at various points so they all had a turn leading and joining the group
The panic will be reproduced following these steps:
Step 1. Run the Sarama consumer (Leader). Step 2. Run the Java consumer.
This is due to the following improvements of cooperative-sticky strategy implementations within Java. Specifically, in userData
, the first 4 bytes represent the generationID
(where 4 bytes of 0xff
actually translate to -1). This can be observed in the code changes here.
I will address this issue at a later time.
Hi @prestona @dnwe,
FYI, I've submitted a commit to fix the issue mentioned above. 🛠️
After extensive testing with different Kafka clients, including librdkafka
(v3.3.2), java client
(v3.5.0 & v3.3.2), and sarama
(v3.3.2), I believe the panic issues have been successfully resolved.
However, during testing with java client
(v2.8.1), I observed an occurrence of java.lang.IllegalStateException
in this client when the cooperative assignment was being validated by it. It's worth noting that this behavior appears to be specific to the older versions of the Java client. Since this java client is the leader who did the assignments, I believe that the underlying cause might be rooted in these outdated Java client versions rather than sarama.
Interesting. It took me a few attempts, but yes I also managed to trigger that IllegalStateException with a Java 2.8.1 consumer:
[2023-08-23 21:03:16,182] ERROR [Consumer clientId=consumer-sarama-coop-1, groupId=sarama-coop] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [cooperative-topic-2] which are still owned by some members (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2023-08-23 21:03:16,189] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: Assignor supporting the COOPERATIVE protocol violates its requirements
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.validateCooperativeAssignment(ConsumerCoordinator.java:668)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:592)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:443)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:102)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
@napallday OK I looked over apache/kafka and I see that they skipped this validation code in the kafka-client under KAFKA-13406 and PR #11439 due to issues with the verification and that was the Java client behaviour from 2.8.2 onward
Cool! 🎉 Huge thanks to @dnwe for the invetigation.
The explanations provided in the Jira ticket are crystal clear.
By the way, just like this fix in apache/kafka, the official Java clients have some improvements in sticky and cooperative-sticky assignors since the initial introduction of the incremental cooperative balance protocol. In the future, we can integrate these enhancements gradually.
@napallday were you planning to do the follow up work around metrics, pause/resume and additional FV tests under this PR?
I suppose the alternative would be to merge this as-is, but report the feature as experimental in the changelog so people can test it out in their staging environments
@dnwe Thanks for the reminder.
For the pause/resume
feature, I've verified that the behaviour is consistent with Java Kafka.
- For EAGLE protocol: all paused status will be dropped when rebalance
- For COOPERATIVE protocol: paused status of revoked partitions will be dropped when rebalance
Regarding the metrics and FV tests, I may add it at a later time due to some personal matters. Therefore, I concur with releasing it as an experimental feature initially.
Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.
Hi @napallday @dnwe, it seems there is no major concern about this PR. Do you think this can be merged soon?
It is indeed a useful feature I'd like to try out. But it's better to use main branch (or better a release) than a pull request branch.
@wwwjfy 👋🏻 thanks for getting in touch
The issue with merging to main at the moment is that the PR branch introduces new interfaces of ConsumeV2 and ConsumerGroupHandlerV2, which we'd then be locked into supporting and ideally we want to be exposing cooperative balance strategy as just a config change for users, rather than requiring them to rewrite their app code to make use of it.
Similarly, if we merge to main without more FV coverage, it might not be obvious to users (unless they read notes on GitHub Releases) that it is considered experimental.
Would it help your testing if I pull the commits into a branch on the main repository rather than them being only on a fork?
i.e.,., if I make a cooperative-balance-strategy
branch with these commits then you can go get github.com/IBM/sarama@cooperative-balance-strategy
, start calling the new interfaces and experiment with the code and provide feedback?
I hope the current ConsumerGroupHandler can still use the cooperative balance stragtegy. If user want to receive the revoke and assign partition, they can provide their own OnAssignPartition and OnRevokePartition method. and the default handler will be provide if user use origin ConsumerGroupHandler without change current code.
Thanks for the quick response :)
It does make sense, if possible, we maintain a single interface than two. (Sorry I didn't read the change and the comments in this PR)
I'm not sure what's the way forward. Using single interfaces will probably break backward compatibility I guess, which means we'll wait until a major version upgrade as well as the consolidation work before that.
For testing, to me alone, I have my forked version anyway, though for others, what you mentioned may be useful :D
Still evaluating using sarama or confluent sdk which already has this feature. will have feedback if I get anything useful
Yes I think the ideal scenario would be if we can rework this PR to keep the Sarama consumer interface consistent, but expose an opt-in similarly to the Java client via an equivalent to the partition.assignment.strategy
config variable
I feel it's hard to keep using the ConsumerGroupHandler
interface since it's highly coupled with eager
rebalance protocol - e.g. ConsumerGroupSession
as the function parameter, is generated in every new generation.
As the new implementation ConsumerGroupHandlerV2
also works for the previous one ConsumerGroupHandler
, may I know if it's possible to have another major version of Sarama in the future to include this PR?
I can help add fv tests later but it seems to be a huge work...
if we want to keep ConsumerGroupHandler
, i think the possible way is to :
- call
Setup(ConsumerGroupSession)
/Cleanup(ConsumerGroupSession)
when new assignment receives / a new rebalance tirggered. - change
ConsumerGroupSession
'sClaims/MemberId/GenerationId
functions to be thread safe.
so if user don't care about assignment change in their previous code(or they only do some log print), only enable cooperative in config will be enough. But user will still need to be aware of the API behaviour change when enable cooperative rebalance.
any ETA for this feature ?