aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Implement KIP-345 in aiokafka (rebase of #682)

Open ntextreme3 opened this issue 2 years ago • 4 comments

Changes

Fixes #680

This is a rebase of https://github.com/aio-libs/aiokafka/pull/682 -- see https://github.com/aio-libs/aiokafka/pull/682#issuecomment-1070337629

Checklist

  • [ ] I think the code is well written
  • [ ] Unit tests for the changes exist
  • [ ] Documentation reflects the changes
  • [ ] Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

ntextreme3 avatar Mar 17 '22 16:03 ntextreme3

@ods Do you have suggestions on the types of tests you'd like to see here. I haven't played with this yet, just rebased it. But I'll need this functionality in the coming months so at some point I'll be going through it more thoroughly anyway.

ntextreme3 avatar Mar 17 '22 16:03 ntextreme3

This pull request introduces 1 alert when merging 83e882a847e7d7089cb757065a89107d5d843e5d into 2c54e10c57760f779961a8c2f5df8ad609ef6983 - view on LGTM.com

new alerts:

  • 1 for Signature mismatch in overriding method

lgtm-com[bot] avatar Mar 17 '22 16:03 lgtm-com[bot]

Codecov Report

Merging #827 (83e882a) into master (2c54e10) will decrease coverage by 47.35%. The diff coverage is 47.67%.

@@             Coverage Diff             @@
##           master     #827       +/-   ##
===========================================
- Coverage   96.96%   49.60%   -47.36%     
===========================================
  Files          30       32        +2     
  Lines        5434     5497       +63     
===========================================
- Hits         5269     2727     -2542     
- Misses        165     2770     +2605     
Flag Coverage Δ
cext 40.60% <47.67%> (-47.14%) :arrow_down:
integration ?
purepy 49.15% <47.67%> (-47.35%) :arrow_down:
unit 49.60% <47.67%> (+0.12%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
aiokafka/consumer/consumer.py 16.36% <0.00%> (-80.35%) :arrow_down:
aiokafka/consumer/subscription_state.py 81.36% <ø> (-18.64%) :arrow_down:
aiokafka/consumer/group_coordinator.py 9.70% <8.51%> (-89.20%) :arrow_down:
aiokafka/consumer/assignors.py 83.33% <83.33%> (ø)
aiokafka/client.py 27.65% <100.00%> (-68.99%) :arrow_down:
aiokafka/errors.py 99.32% <100.00%> (-0.68%) :arrow_down:
aiokafka/protocol/group.py 100.00% <100.00%> (ø)
aiokafka/producer/sender.py 14.44% <0.00%> (-81.84%) :arrow_down:
... and 13 more

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 2c54e10...83e882a. Read the comment docs.

codecov[bot] avatar Mar 18 '22 05:03 codecov[bot]

Do you have suggestions on the types of tests you'd like to see here.

We need to proof, that this new functionality works. There must be some code with static partition assignor.

ods avatar Mar 18 '22 06:03 ods

I played around with testing this feature last year. I didn't make it very far, but I thought an important part would be the querying of consumers' group instance IDs – and for that, a v4 DescribeGroups req/res is necessary. kafka-python still does not have this record version. I started building it out in my branch:

# aiokafka/protocol/admin.py

from kafka.protocol.admin import DescribeGroupsRequest_v3, DescribeGroupsResponse_v4
from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String


class DescribeGroupsResponse_v4(Response):
    API_KEY = 15
    API_VERSION = 4
    SCHEMA = Schema(
        ('throttle_time_ms', Int32),
        ('groups', Array(
            ('error_code', Int16),
            ('group', String('utf-8')),
            ('state', String('utf-8')),
            ('protocol_type', String('utf-8')),
            ('protocol', String('utf-8')),
            ('members', Array(
                ('member_id', String('utf-8')),
                ('group_instance_id', String('utf-8')),
                ('client_id', String('utf-8')),
                ('client_host', String('utf-8')),
                ('member_metadata', Bytes),
                ('member_assignment', Bytes)))),
            ('authorized_operations', Int32))
    )


class DescribeGroupsRequest_v4(Request):
    API_KEY = 15
    API_VERSION = 4
    RESPONSE_TYPE = DescribeGroupsResponse_v4
    SCHEMA = DescribeGroupsRequest_v3.SCHEMA

theY4Kman avatar Mar 13 '23 16:03 theY4Kman

We may consider removing kafka-python from dependencies, as even issues with PRs are not merged into master.

ods avatar Mar 14 '23 11:03 ods

Hello @ods , Is there a plan for implementing this change? I am using aiokafka 0.8.0, and i would like to see this feature as i am seeing issues with consumer re-balancing and the KIP-345 would be helpful in resolving that.

rujutashinde avatar Jun 06 '23 14:06 rujutashinde

Hello @rujutashinde, thank you for reminding. Unfortunately I don't have enough expertise to make this PR production ready. Would you like to participate?

ods avatar Jun 06 '23 18:06 ods

Hello @rujutashinde, thank you for reminding. Unfortunately I don't have enough expertise to make this PR production ready. Would you like to participate?

I see. Unfortunately i dont have the expertise either, i am more a user of the project. But i will give it a go when i have some time cycles. Please do update the ticket if there are any new developments to this . appreciate the quick response!

rujutashinde avatar Jun 06 '23 18:06 rujutashinde

Hi, I am wondering if there is anything we could do to finalize and merge this PR, as this is a super useful feature? In case you haven't noticed and if it could be useful, the Robinhood folks implemented this in their fork of aiokafka: https://github.com/robinhood/aiokafka/pull/21/files Let me know if any help is required

tartieret avatar Oct 10 '23 21:10 tartieret

Hi @tartieret, Could you please provide tests for it?

ods avatar Oct 15 '23 14:10 ods

Hello! I'd love to help get this change merged into the project, my company would benefit greatly from KIP-345 mode.

I'll work on the requested tests. Are tests all that is preventing this from getting merged?

joshuaherrera avatar Oct 27 '23 21:10 joshuaherrera

I'll work on the requested tests. Are tests all that is preventing this from getting merged?

Yes, tests for it is the most important part. There is also some work to be done to resolve conflicts with merge of kafka-python's code, but this part I could probably do myself.

ods avatar Oct 28 '23 12:10 ods

Replaced with https://github.com/aio-libs/aiokafka/pull/941

ods avatar Dec 07 '23 20:12 ods