kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Support AWS_MSK_IAM authentication

Open mattoberle opened this issue 4 years ago • 36 comments

This pull request addresses issue #2232 by adding an AWS_MSK_IAM authentication mechanism. A detailed description of the authentication scheme is available here:

  • https://github.com/aws/aws-msk-iam-auth#uriencode

I understand that kafka-python may not be the appropriate place to put a vendor-specific authentication mechanism. If that's the case maybe it's better suited as a plug-in? The library doesn't support auth extensions at the moment but it doesn't look like a huge lift to get there.

To use the mechanism pass the following keyword arguments when initializing a class:

security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM',
bootstrap_servers=[
    'b-1.cluster.x.y.kafka.region.amazonaws.com:9098',
    ...
],

The credentials and region will be pulled using botocore.session.Session. Using the mechanism requires the botocore library which can be installed with:

pip install botocore

TODO:

  • [ ] Documentation
  • [x] Test authentication payload generation
  • [ ] Test config verification / auth method
  • [x] Refresh mechanism for temporary credentials?

This change is Reviewable

mattoberle avatar Aug 18 '21 20:08 mattoberle

It looks like temporary credentials will be refreshed through existing error handling. I'm not sure if that's the ideal case, but it appears to provide an uncomplicated initial solution. The following output was clipped from a test producer running the following:

producer = KafkaProducer(**CONFIG)

i = 0
while True:
    i += 1
    ts = datetime.datetime.utcnow().isoformat()
    msg = f'[{i}] {ts} Test Message'
    print(msg)
    future = producer.send(TOPIC, msg.encode('utf-8'))
    meta = future.get(timeout=10)
    print(meta.topic)
    print(meta.partition)
    print(meta.offset)
    time.sleep(10)
[358] 2021-08-19T16:35:59.526059 Test Message
kafka-python-test
0
740

[359] 2021-08-19T16:36:09.569503 Test Message
kafka-python-test
0
741

ERROR:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connected> [IPv4 ('10.10.10.10', 9098)]>: socket disconnected
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connected> [IPv4 ('10.10.10.10', 9098)]>: Closing connection. KafkaConnectionError: socket disconnected
WARNING:kafka.client:Node 3 connection failed -- refreshing metadata
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connecting> [IPv4 ('10.10.10.10', 9098)]>: connecting to b-3.cluster.x.y.kafka.region.amazonaws.com:9098 [('10.10.10.10', 9098) IPv4]
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:kafka.conn:<BrokerConnection node_id=3 host=connecting to b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <authenticating> [IPv4 ('10.10.10.10, 9098)]>: Authenticated via AWS_MSK_IAM {"version":"2020_10_22","request-id":"f8e468a0-3d8c-4999-bd52-6252855c5387"}
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <authenticating> [IPv4 ('10.10.10.10', 9098)]>: Connection complete.

[360] 2021-08-19T16:36:19.608427 Test Message
kafka-python-test
0
742

mattoberle avatar Aug 19 '21 16:08 mattoberle

Hi! I am really interested in having an aws-msk iam authentication. Could you maybe elaborate on what's the current state here (also related to https://github.com/dpkp/kafka-python/pull/2256)? Is the shown error kind of crucial or what is actually meant with "It looks like temporary credentials will be refreshed through existing error handling."?

Thanks in advance!

dnks23 avatar Oct 05 '21 15:10 dnks23

@dnks23 This PR provides a working implementation of AWS MSK authentication. If using role-based authentication credential refresh works on a try/except basis (there is no preemptive credential refresh). In other words, if the credentials expire a producer/consumer will:

  1. Refresh the credentials.
  2. Re-authenticate.
  3. Retry the action.

#2256 is something I put together as an alternative to this PR. It doesn't provide the MSK auth but exposes a mechanism for plugging in custom auth.

mattoberle avatar Oct 05 '21 16:10 mattoberle

Thanks for your reply @mattoberle! that's actually great news... Regarding the preemptive credential refresh:

It might be possible to just use botocore.credentials.RefreshableCredentials which can be used to initialize the BotoSession. Haven't tried that but the preemptive refresh could then be left to boto which might be a good solution here. What's your opinion on that? Have you even already tried that?

dnks23 avatar Oct 06 '21 14:10 dnks23

Hi, I was wondering on the next steps for getting this merged and released?

jplock avatar Oct 14 '21 19:10 jplock

@mattoberle @dnks23 Can you please give a sample code of how to connect MSK using IAM auth.

phoenixx1 avatar Dec 06 '21 14:12 phoenixx1

@phoenixx1 to use MSK auth you need to follow these steps:

  1. Provide AWS credentials via a supported boto discovery mechanism.1
  2. Initialize your producer or consumer with these keyword arguments:
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM',

1 Expose your IAM credentials via environment variable, or set them in ~/.aws/credentials, or have a role assigned to the EC2 instance or container, etc... https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html


@jplock I'm not the best person to answer your question, we'd need to check with @dpkp to gauge interest in integrating this PR or #2256 which allows this to be supported independently.

The other popular Kafka client library for Python is maintained by Confluent and (understandably) is not especially interested in supporting MSK auth. The confluent library is a set of bindings to librdkafka and a little trickier to extend via plugins.

mattoberle avatar Dec 08 '21 01:12 mattoberle

@mattoberle image

I'm trying to connect it using a Lambda Function. The role assigned to lambda function already has access to msk.

Getting this error for the sasl_ssl image

phoenixx1 avatar Dec 08 '21 08:12 phoenixx1

@phoenixx1 your Lambda Function is not running the code from this branch.

That error message is generated here, and displays the list of valid sasl_mechanisms defined here.

You'll notice that AWS_MSK_IAM is missing in your list. To access the features in this PR you should install kafka-python from the following URL:

https://github.com/mattoberle/kafka-python/archive/7ff323727d99e0c33a68423300e7f88a9cf3f830.tar.gz

mattoberle avatar Dec 08 '21 14:12 mattoberle

@mattoberle Tried using the package url provided by you, still facing same issue. (Created new venv to check) Do you have any sample code where you have used the same, so I can refer if anything is missing from mine. Will be really helpful thanks.

phoenixx1 avatar Dec 13 '21 07:12 phoenixx1

@mattoberle The issue I have is that my boto session needs to assume a role since I use a cross-account AWS MSK. This is not supported, since the basic boto session is created inside of the library. It can be forked and changed, so that session is now passed as an argument, but was wondering if there is any other way to do it, that I missed?

lgasperin avatar Jan 06 '22 17:01 lgasperin

@mattoberle The issue I have is that my boto session needs to assume a role since I use a cross-account AWS MSK. This is not supported, since the basic boto session is created inside of the library. It can be forked and changed, so that session is now passed as an argument, but was wondering if there is any other way to do it, that I missed?

I did not account for this case but it seems worth supporting.

mattoberle avatar Jan 10 '22 21:01 mattoberle

@mattoberle Apologies for the long comment :)

We are using changes from this PR to run our producer/consumers.

To connect to MSK, we have created an IAM user and set these IAM user credentials in the environment variable: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. This setup works but almost every 12 hours we get the following error in consumer during poll() operation:

File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py", line 657, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py", line 679, in _poll_once
    self._coordinator.poll()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/coordinator/consumer.py", line 270, in poll
    self.ensure_coordinator_ready()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/coordinator/base.py", line 269, in ensure_coordinator_ready
    raise future.exception  # pylint: disable-msg=raising-bad-type
kafka.errors.GroupAuthorizationFailedError: [Error 30] GroupAuthorizationFailedError: some_consumer

Found below error in kafka-authorizer.log

denied to perform DESCRIBE on resource some_consumer of type GROUP during API call FindCoordinator because CLIENT_NOT_AUTHENTICATED (kafka.authorizer.logger)"

I guess the boot session token lasts for 12 hours then it is refreshed. Could you help here to understand what is happening?

Could you please help here?

bkatwal avatar Mar 03 '22 05:03 bkatwal

@bkatwal I think you are describing what I attempted to explain here: https://github.com/dpkp/kafka-python/pull/2255#issuecomment-902075097

The code in this PR doesn't include anything to actively refresh expiring credentials (although I'd expect that the credentials get refreshed during the exception handling).

Full transparency, I'm no longer using kafka-pythonor MSK auth. There is (understandably) a lack of appetite to incorporate a proprietary authentication scheme into the more active libraries backed by librdkafka.

mattoberle avatar Mar 06 '22 19:03 mattoberle

@mattoberle can you share what you use as an alternative of kafka-python?

Gatsby-Lee avatar Apr 15 '22 19:04 Gatsby-Lee

@bkatwal and @mattoberle I am a bit late on the topic, but I did some tests using a role that my user needs to assume in order to interact with the MSK cluster. The assumption of the role as you know is only temporary and the session token needs to be refreshed every N seconds.

If you look carefully in the boto3 documentation, you'll see this. This means that if you configure your ~/.aws/config as:

[default]
region = eu-central-1
source_profile = msk-programmatic-user
role_arn = arn:aws:iam::11111111111:role/msk-client
duration_seconds = 900

with the ~/.aws/credentials as:

[msk-programmatic-user]
aws_access_key_id = AKIA...
aws_secret_access_key = E532..

boto3 will know that you want to use the msk-client role and it will take care of refreshing credentials under the hood for you. Without you having to handle any try..except and refresh the token via a i.e sts.assume_role() logic.

Hope this will help!

darthale avatar Apr 28 '22 13:04 darthale

Hello, this looks pretty solid, however it's been almost a year since it first came about. Is there a roadmap regarding this being merged in and released?

luis-nook avatar Jun 22 '22 15:06 luis-nook

Hi @mattoberle , did you get any interest from @dpkp or the maintainers?
I am interested to pick this up to add in IAM role and creds refresh but would like to check if the maintainers are interested to get this feature merged. I do have use cases for MSK IAM role authentication.

joshuachong avatar Jul 18 '22 15:07 joshuachong

+1 to get this merged

vrioux avatar Jul 19 '22 17:07 vrioux

kafka.errors.NoBrokersAvailable: NoBrokersAvailable

Has anyone encountered this error? I have passed the credentials in environmental variables.

azizulwahid avatar Jul 26 '22 04:07 azizulwahid

@azizulwahid With credentials, you need to set AWS_DEFAULT_REGION. Try adding AWS_DEFAULT_REGION environment variable or setting it in your compute/container :) Set the region where your MSK cluster sits.

bkatwal avatar Jul 26 '22 05:07 bkatwal

@azizulwahid Try setting AWS_DEFAULT_REGION environment variable :) Set the region where your MSK cluster sits.

@bkatwal let me try that real quick

azizulwahid avatar Jul 26 '22 05:07 azizulwahid

@bkatwal I still got the same error...

azizulwahid avatar Jul 26 '22 05:07 azizulwahid

@bkatwal I still got the same error...

Could you send the full stacktrace?

bkatwal avatar Jul 26 '22 05:07 bkatwal

Hi @mattoberle , did you get any interest from @dpkp or the maintainers? I am interested to pick this up to add in IAM role and creds refresh but would like to check if the maintainers are interested to get this feature merged. I do have use cases for MSK IAM role authentication.

Hi @mattoberle @jeffwidman - would we consider this? Happy to contribute to the code to add in IAM role and creds refresh.

joshuachong avatar Jul 29 '22 06:07 joshuachong

@bkatwal I still got the same error...

Could you send the full stacktrace?

@bkatwal I got this working already. The issue was assume role - I have added another block of code to support that. Thanks!

azizulwahid avatar Jul 29 '22 06:07 azizulwahid

how do you guys use this change?

  • Do you build your own python package with this code?
  • OR monkey-patching?
  • sth else?

Thank you

Gatsby-Lee avatar Jul 29 '22 18:07 Gatsby-Lee

@mattoberle and @dpkp - I am curious that outside of the outstanding TODOs around docs and Test config verification / auth method, would there be any other blockers to getting this integrated?

Hi @mattoberle , did you get any interest from @dpkp or the maintainers? I am interested to pick this up to add in IAM role and creds refresh but would like to check if the maintainers are interested to get this feature merged. I do have use cases for MSK IAM role authentication.

Hi @mattoberle @jeffwidman - would we consider this? Happy to contribute to the code to add in IAM role and creds refresh.

Asking essentially the same thing @joshuachong above really :)

ScheerMT avatar Aug 01 '22 19:08 ScheerMT

From what I understand, this project is looking for maintainers and acceptance of new features is unlikely at this point in time. This library was last published in 2020, something to consider if you can foresee wanting to use newer broker features. https://github.com/dpkp/kafka-python/issues/2290#issuecomment-1009133967

I'm aware of an effort to integrate IAM into librdkafka here: https://github.com/UrbanCompass/librdkafka

A wheel of confluent-kafka-python can be built using that fork, but you'll end up in the same situation (maintaining a fork yourself). The official line is that IAM support will not be integrated into librdkafka because vendor-specific authentication mechanisms don't belong in the library.

Amazon only provides support for JVM-based clients: https://github.com/aws/aws-msk-iam-auth

I've left this PR open because it seems like some people have found it to be a helpful reference. To be clear though, I am no longer using this library or the UrbanCompass fork of librdkafka. I am also not a maintainer of this library.

Possible paths forward for those that wish to use this:

  1. Fork and comment if you plan to publicly support this feature-- we can point people to that fork from this PR discussion.
  2. Reach out about becoming a maintainer (see first link in this comment).

I know this isn't the update that anyone is looking for, but I just want to adjust the expectations for anyone looking for a timeline.

mattoberle avatar Aug 06 '22 14:08 mattoberle

Any news on the merge timeline?

gmuslia avatar Jun 28 '23 11:06 gmuslia