librdkafka
librdkafka copied to clipboard
Feature request: Support pluggable partition assignment algorithms for consumer
Please add support for pluggable partition assignment algorithms for the consumers.
I was in a design review today where we will need this for a handful of upcoming services.
This was originally requested back in https://github.com/edenhill/librdkafka/issues/1135.
Using this ticket to track progress on this feature.
I would like to implement a service like Kafka Streams Interactive Queries on top of librdkafka. After looking at the source code of both projects, the only critical part that I miss in librdkafka is the ability to provide a custom assignor to be able to manage standby replicas of local states.
According to @edenhill here, here and the code here, it is possible to configure custom assignors internally, but it is not exposed at the API level, and only the builtin range and roundrobin assignors are allowed.
I was wondering if there is any plan to implement it, how difficult would it be, or what are the chances to get mentoring for contributing.
Some more context on why is so relevant to be able to customise the assignor. This is the case for Kafka Streams, where there are many consumers with state, which makes rebalances quite expensive when not done properly. Currently Kafka Streams uses one assignor called Sticky that minimises state migration between instances (and also accounts for standby replicas).
There is a recent KIP-429 to make the protocol more incremental, that gives an interesting view on the current state and the relevance of the matter.
Let's look at making the assignor API public when we implement KIP-429 (et.al) to make sure all use-cases are covered.
Is there any update on this feature?
I am also interested in using this feature. Is it likely the proposed api will be as @edenhill reference here. Is making this api public something you might accept a PR for, or is the above ref larger change doing to nullify such a PR?
+1 for the feature
What is your use-case for supplying a custom partition assignor?
I wanted to ensure that only one consumer is active for a consumer group. The topics i am using have only one partition and we recommend our users to create a single user per consumer group. But if by mistake there are more consumers joining, there is a problem. Currently I am using RebalanceCb to handle things. Say a,b subscribe to set of 10 topics, then only one of the consumer will receive all the topics while the other will not start (fail). This works fine.
Problem is, consumer A subscribes to 2 topics, consumer B subscribes to 10 topics out of which 2 are overlapping. Not after the rebalanceCb, consumer A will start receiving in 2 topics and B in remaining 8 topics. B is not even aware that it is not receiving the messages because of A. To avoid this, I wanted to implement my own partition assignment algorithm.
@edenhill - would you consider a PR implementing a public API for custom assignors? It looks like a similar pattern to custom Partitioners would work?
@ChadJessup I think a good start would be a PR containing only the public API (rdkafka.h), but I suggest to wait with that until the Incremental rebalancing PRs have been merged since there will be internal changes to the assignor APIs, see the incrreb branch.
@edenhill I am also looking for partition assignor support - to implement Active/Passive (Hot-Hot) Consumers
My use case is - I have kafka a TOPIC with 6 partition, when first Consumer starts it receive all partition and consumes messages and process them. Now if second Consumer start in same consumer group.id then I dont want it to assign any Partition as first Active Consumer is up & processing messages. But, when first Active Consumer goes down then all partition should be assign to Second (or Next) Consumer and it will take over and start processing. Now, if use restarts first process/Consumer then it should go it stand by mode. I would like to achieve this by implementing Custom SingleConsumerGroup Partition Assigner (Use case something like mentioned here - Failover strategy: https://medium.com/streamthoughts/understanding-kafka-partition-assignment-strategies-and-how-to-write-your-own-custom-assignor-ebeda1fc06f3).
I see there are more use case have same/similar requirement where only single consumer in a consumer group should get all partition and other/new instances should sit idle. So, I think it will be best that this support is available as out of the box option along with existing 3 strategies RangeAssigner, RoundRobinAssigner & StickyAssigner something like SingleConsumerGroupAssigner.
I'd really like to add support for pluggable assignors, the problem is it is a big and fluent API surface area and with our API and ABI guarantees we only get one shot, we can't change it once it has been shipped. So the way to make it extensible and future proof is to make all types private and provide accessor methods, which is a lot of tedious work.
Now, since this is sort of a niche feature which will initially only be used by advanced users (that are capable of writing an assignor), what we could perhaps do is expose the internal assignor API as an experimental API without API/ABI stability guarantees. This would require you as a user to rebuild your assignor when you upgrade librdkafka, but maybe that's an acceptable alternative to move things forward here. The long term plan should be to add it as a proper public API.
What do you all think?
Hi @edenhill
I have attempted to export minimal API needed to support custom assignors in https://github.com/edenhill/librdkafka/pull/3812 Can you please take a look?
My team is also interested in this feature (we use librdkafka via rdkafka in Rust). We would appreciate a review of @niamster's PR. Thank you!
Hi @edenhill - we're hoping to get another look at this, the PR solution was updated to be in sync with the current master and passes all tests. Based on the likes in the original issue (which is now 3+ years old) there are at least 20+ people that are interested in this feature. Can you let us know what needs to happen to get this considered for merging? Thanks so much
bump
We are also interested in this.
I am also interested in implementing a custom strategy. My use case is: I have a consumer group consuming from more than one topic and I want the partitions assignment to be fair per topic. Diving more, I have 3 consumer instances consuming from 3 topics: T1, T2, T3 each having 3 partitions P11, P12, ... P21, P22 ... P31, P32 ..., I don't want to end up with consumer instance C1 consuming just from one topic: C1(P11,P12,P13), C2(P21,P22,P23), C3(P31, P32, P33). I want to have Cx(P1x, P2x, P3x). The alternative is to create a consumer group per topic but I think that uses more resources.
+1
We already use a custom CooperativeStickyAssignor for our Java based consumers to support our deployment model. When a new version of a consuming service is released, it joins the same consumer group as the existing version. As traffic is ramped up for the new version, partitions are assigned based on the percentage of traffic handled by each version.
We would like to do the same for our Python consumers.