librdkafka
librdkafka copied to clipboard
assignor: API for a custom consumer group assignment strategy
This addresses https://github.com/edenhill/librdkafka/issues/2284
Exported API is sufficient enough to implement existing assignment strategies and allow users define their own.
This PR does not change how rebalance works, it reworks existing API so it can be accessible via <librdkafka/rdkafka.h>
.
Some structures are split into "internal" and "public" to minimise the amount of changes. The conversion from one to another is done just before the callbacks are called.
Open questions:
- Should this new API be somehow mangled as "experimental" by adding
_v0
or_exp
suffix to functions and structures? - Should callbacks receive a single structure as an argument to be able to extend it in the future w/o breaking compatibility?
- I'm not fond of passing "opaque" object to the
rd_kafka_assignor_register
function. I would prefer to bass it viard_kafka_t
but I don't have a strong opinion.
NOTE: there are some changes related to formatting. I have setup my editor to automatically call clang-format on save with a given (default repository) configuration.
Interesting, 0120_asymmetric_subscription
fails on windows in Appveyor, while it does not fail on linux (Travis CI
).
Passes fine on my local linux setup
TEST 20220412145306 (bare, scenario default) SUMMARY
#==================================================================#
| <MAIN> | PASSED | 13.003s |
| 0120_asymmetric_subscription | PASSED | 12.020s |
#==================================================================#
[<MAIN> / 13.003s] 0 thread(s) in use by librdkafka
[<MAIN> / 13.003s]
============== ALL TESTS PASSED ==============
###
### ./test-runner in bare mode PASSED! ###
###
uname -a
Linux 5.13.0-23-generic #23-Ubuntu SMP Fri Nov 26 11:41:15 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
and in the OSX env:
TEST 20220412145911 (bare, scenario default) SUMMARY
#==================================================================#
| <MAIN> | PASSED | 11.032s |
| 0120_asymmetric_subscription | PASSED | 10.090s |
#==================================================================#
[<MAIN> / 11.034s] 0 thread(s) in use by librdkafka
[<MAIN> / 11.034s]
============== ALL TESTS PASSED ==============
###
### ./test-runner in bare mode PASSED! ###
###
Darwin 20.6.0 Darwin Kernel Version 20.6.0: Tue Feb 22 21:10:41 PST 2022; root:xnu-7195.141.26~1/RELEASE_X86_64 x86_64 i386 MacBookPro15,2 Darwin
EDIT: it does not pass in Appveyor only, it passes windows build in Travis CI
This is fixed, commit is squashed, the issues was in this loop where I relied on the order, but the array might be manipulated by the caller (sort, etc.).
Why it was failing only in Appveyor
🤔 (I will try to dig deeper)? I'm wondering how we can add fuzziness to such tests to make such issues easily reproducible.
EDIT: I've found the root cause. In Appveyor
(which uses MSVC
) strndup("", 0)
return NULL
.... while in other env it returns ""
.
The fix (committed, squashed) is to check rkgm_group_instance_id
for NULL
using RD_KAFKAP_STR_IS_NULL()
, so rd_kafka_assignor_topic_cmp
properly sorts by member id
(and ignores group_instance_id
).
This raises a question: should rd_kafka_assignor_topic_cmp
be fixed to enforce comparison of member_id
if group_instance_id
are equal 🤔 ?
@edenhill are you interested in this PR?
I can't reproduce style check failure with clang format 14.0.6
.
@edenhill @emasab are there any chances to get feedback on this PR?
Hello @scanterog @niamster sorry for the long wait. I want to point out that with KIP-881 the range assignor is being changed to co-partitioned (same partition numbers to the same members, given topics with same partition count) and rack-aware, also with KIP-848 the range assignor will be co-partitioned, rack-aware and sticky. The assignment will be done by the group coordinator.
The two properties: co-partitioned and sticky allow to create joins in a way similar to what Kafka Streams does, with local cache.
Given this roadmap, would it be enough for Datadog's needs? If there's something else that cannot be done without a customer assignor please comment.
Also, I'd like to know from other people about kinds of assignors they're thinking to, other than the cases already listed in #2284
Hi @emasab! Thanks for checking this.
We need a way to compute the assignment logic on the client side and as far as I understood KIP-848 supports that.
The group coordinator will either directly compute a new assignment with its server side assignor or delegate the assignment to a member of the group if a client-side assignor must be used.
Co-partitioned is great when you have a balanced traffic across all partitions. In our case, we want to compute the assignment in order to address the issue of imbalanced partitions. On the client-side assignor we can measure the load on each partition (by using external signals) and based on that distribute the load evenly across the group members which does not necessarily translate to the same number of partitions to each member.
Thanks, that's an interesting use case and you can't handle it with manual assignment because it's not static and that'll mean losing the "transactional" property of consumer group assignments. KIP-848 supports custom assignors, we were asking the use case to estimate the priority. Explicitly triggering a rebalance isn't supported in librdkafka (KIP-568) and you'd have to unsubscribe and subscribe again, without static membership, in one of your consumers. Unless it's implemented too, together with a custom assignor.
Please let us know if you need further information. If you can also provide the estimate you have came across that would useful for us, thanks again @emasab.