aiokafka
aiokafka copied to clipboard
Implement KIP-345 in aiokafka (rebase of #682)
Changes
Fixes #680
This is a rebase of https://github.com/aio-libs/aiokafka/pull/682 -- see https://github.com/aio-libs/aiokafka/pull/682#issuecomment-1070337629
Checklist
- [ ] I think the code is well written
- [ ] Unit tests for the changes exist
- [ ] Documentation reflects the changes
- [ ] Add a new news fragment into the
CHANGES
folder- name it
<issue_id>.<type>
(e.g.588.bugfix
) - if you don't have an
issue_id
change it to the pr id after creating the PR - ensure type is one of the following:
-
.feature
: Signifying a new feature. -
.bugfix
: Signifying a bug fix. -
.doc
: Signifying a documentation improvement. -
.removal
: Signifying a deprecation or removal of public API. -
.misc
: A ticket has been closed, but it is not of interest to users.
-
- Make sure to use full sentences with correct case and punctuation, for example:
Fix issue with non-ascii contents in doctest text files.
- name it
@ods Do you have suggestions on the types of tests you'd like to see here. I haven't played with this yet, just rebased it. But I'll need this functionality in the coming months so at some point I'll be going through it more thoroughly anyway.
This pull request introduces 1 alert when merging 83e882a847e7d7089cb757065a89107d5d843e5d into 2c54e10c57760f779961a8c2f5df8ad609ef6983 - view on LGTM.com
new alerts:
- 1 for Signature mismatch in overriding method
Codecov Report
Merging #827 (83e882a) into master (2c54e10) will decrease coverage by
47.35%
. The diff coverage is47.67%
.
@@ Coverage Diff @@
## master #827 +/- ##
===========================================
- Coverage 96.96% 49.60% -47.36%
===========================================
Files 30 32 +2
Lines 5434 5497 +63
===========================================
- Hits 5269 2727 -2542
- Misses 165 2770 +2605
Flag | Coverage Δ | |
---|---|---|
cext | 40.60% <47.67%> (-47.14%) |
:arrow_down: |
integration | ? |
|
purepy | 49.15% <47.67%> (-47.35%) |
:arrow_down: |
unit | 49.60% <47.67%> (+0.12%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
Impacted Files | Coverage Δ | |
---|---|---|
aiokafka/consumer/consumer.py | 16.36% <0.00%> (-80.35%) |
:arrow_down: |
aiokafka/consumer/subscription_state.py | 81.36% <ø> (-18.64%) |
:arrow_down: |
aiokafka/consumer/group_coordinator.py | 9.70% <8.51%> (-89.20%) |
:arrow_down: |
aiokafka/consumer/assignors.py | 83.33% <83.33%> (ø) |
|
aiokafka/client.py | 27.65% <100.00%> (-68.99%) |
:arrow_down: |
aiokafka/errors.py | 99.32% <100.00%> (-0.68%) |
:arrow_down: |
aiokafka/protocol/group.py | 100.00% <100.00%> (ø) |
|
aiokafka/producer/sender.py | 14.44% <0.00%> (-81.84%) |
:arrow_down: |
... and 13 more |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact)
,ø = not affected
,? = missing data
Powered by Codecov. Last update 2c54e10...83e882a. Read the comment docs.
Do you have suggestions on the types of tests you'd like to see here.
We need to proof, that this new functionality works. There must be some code with static partition assignor.
I played around with testing this feature last year. I didn't make it very far, but I thought an important part would be the querying of consumers' group instance IDs – and for that, a v4 DescribeGroups req/res is necessary. kafka-python still does not have this record version. I started building it out in my branch:
# aiokafka/protocol/admin.py
from kafka.protocol.admin import DescribeGroupsRequest_v3, DescribeGroupsResponse_v4
from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
class DescribeGroupsResponse_v4(Response):
API_KEY = 15
API_VERSION = 4
SCHEMA = Schema(
('throttle_time_ms', Int32),
('groups', Array(
('error_code', Int16),
('group', String('utf-8')),
('state', String('utf-8')),
('protocol_type', String('utf-8')),
('protocol', String('utf-8')),
('members', Array(
('member_id', String('utf-8')),
('group_instance_id', String('utf-8')),
('client_id', String('utf-8')),
('client_host', String('utf-8')),
('member_metadata', Bytes),
('member_assignment', Bytes)))),
('authorized_operations', Int32))
)
class DescribeGroupsRequest_v4(Request):
API_KEY = 15
API_VERSION = 4
RESPONSE_TYPE = DescribeGroupsResponse_v4
SCHEMA = DescribeGroupsRequest_v3.SCHEMA
We may consider removing kafka-python from dependencies, as even issues with PRs are not merged into master.
Hello @ods , Is there a plan for implementing this change? I am using aiokafka 0.8.0, and i would like to see this feature as i am seeing issues with consumer re-balancing and the KIP-345 would be helpful in resolving that.
Hello @rujutashinde, thank you for reminding. Unfortunately I don't have enough expertise to make this PR production ready. Would you like to participate?
Hello @rujutashinde, thank you for reminding. Unfortunately I don't have enough expertise to make this PR production ready. Would you like to participate?
I see. Unfortunately i dont have the expertise either, i am more a user of the project. But i will give it a go when i have some time cycles. Please do update the ticket if there are any new developments to this . appreciate the quick response!
Hi, I am wondering if there is anything we could do to finalize and merge this PR, as this is a super useful feature? In case you haven't noticed and if it could be useful, the Robinhood folks implemented this in their fork of aiokafka: https://github.com/robinhood/aiokafka/pull/21/files Let me know if any help is required
Hi @tartieret, Could you please provide tests for it?
Hello! I'd love to help get this change merged into the project, my company would benefit greatly from KIP-345 mode.
I'll work on the requested tests. Are tests all that is preventing this from getting merged?
I'll work on the requested tests. Are tests all that is preventing this from getting merged?
Yes, tests for it is the most important part. There is also some work to be done to resolve conflicts with merge of kafka-python
's code, but this part I could probably do myself.
Replaced with https://github.com/aio-libs/aiokafka/pull/941