kafka-python
kafka-python copied to clipboard
Support AWS_MSK_IAM authentication
This pull request addresses issue #2232 by adding an AWS_MSK_IAM authentication mechanism.
A detailed description of the authentication scheme is available here:
- https://github.com/aws/aws-msk-iam-auth#uriencode
I understand that
kafka-pythonmay not be the appropriate place to put a vendor-specific authentication mechanism. If that's the case maybe it's better suited as a plug-in? The library doesn't support auth extensions at the moment but it doesn't look like a huge lift to get there.
To use the mechanism pass the following keyword arguments when initializing a class:
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM',
bootstrap_servers=[
'b-1.cluster.x.y.kafka.region.amazonaws.com:9098',
...
],
The credentials and region will be pulled using botocore.session.Session.
Using the mechanism requires the botocore library which can be
installed with:
pip install botocore
TODO:
- [ ] Documentation
- [x] Test authentication payload generation
- [ ] Test config verification / auth method
- [x] Refresh mechanism for temporary credentials?
It looks like temporary credentials will be refreshed through existing error handling. I'm not sure if that's the ideal case, but it appears to provide an uncomplicated initial solution. The following output was clipped from a test producer running the following:
producer = KafkaProducer(**CONFIG)
i = 0
while True:
i += 1
ts = datetime.datetime.utcnow().isoformat()
msg = f'[{i}] {ts} Test Message'
print(msg)
future = producer.send(TOPIC, msg.encode('utf-8'))
meta = future.get(timeout=10)
print(meta.topic)
print(meta.partition)
print(meta.offset)
time.sleep(10)
[358] 2021-08-19T16:35:59.526059 Test Message
kafka-python-test
0
740
[359] 2021-08-19T16:36:09.569503 Test Message
kafka-python-test
0
741
ERROR:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connected> [IPv4 ('10.10.10.10', 9098)]>: socket disconnected
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connected> [IPv4 ('10.10.10.10', 9098)]>: Closing connection. KafkaConnectionError: socket disconnected
WARNING:kafka.client:Node 3 connection failed -- refreshing metadata
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connecting> [IPv4 ('10.10.10.10', 9098)]>: connecting to b-3.cluster.x.y.kafka.region.amazonaws.com:9098 [('10.10.10.10', 9098) IPv4]
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:kafka.conn:<BrokerConnection node_id=3 host=connecting to b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <authenticating> [IPv4 ('10.10.10.10, 9098)]>: Authenticated via AWS_MSK_IAM {"version":"2020_10_22","request-id":"f8e468a0-3d8c-4999-bd52-6252855c5387"}
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <authenticating> [IPv4 ('10.10.10.10', 9098)]>: Connection complete.
[360] 2021-08-19T16:36:19.608427 Test Message
kafka-python-test
0
742
Hi! I am really interested in having an aws-msk iam authentication. Could you maybe elaborate on what's the current state here (also related to https://github.com/dpkp/kafka-python/pull/2256)? Is the shown error kind of crucial or what is actually meant with "It looks like temporary credentials will be refreshed through existing error handling."?
Thanks in advance!
@dnks23 This PR provides a working implementation of AWS MSK authentication. If using role-based authentication credential refresh works on a try/except basis (there is no preemptive credential refresh). In other words, if the credentials expire a producer/consumer will:
- Refresh the credentials.
- Re-authenticate.
- Retry the action.
#2256 is something I put together as an alternative to this PR. It doesn't provide the MSK auth but exposes a mechanism for plugging in custom auth.
Thanks for your reply @mattoberle! that's actually great news... Regarding the preemptive credential refresh:
It might be possible to just use botocore.credentials.RefreshableCredentials which can be used to initialize the BotoSession. Haven't tried that but the preemptive refresh could then be left to boto which might be a good solution here.
What's your opinion on that? Have you even already tried that?
Hi, I was wondering on the next steps for getting this merged and released?
@mattoberle @dnks23 Can you please give a sample code of how to connect MSK using IAM auth.
@phoenixx1 to use MSK auth you need to follow these steps:
- Provide AWS credentials via a supported
botodiscovery mechanism.1 - Initialize your producer or consumer with these keyword arguments:
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM',
1 Expose your IAM credentials via environment variable, or set them in
~/.aws/credentials, or have a role assigned to the EC2 instance or container, etc... https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
@jplock I'm not the best person to answer your question, we'd need to check with @dpkp to gauge interest in integrating this PR or #2256 which allows this to be supported independently.
The other popular Kafka client library for Python is maintained by Confluent and (understandably) is not especially interested in supporting MSK auth. The confluent library is a set of bindings to librdkafka and a little trickier to extend via plugins.
@mattoberle

I'm trying to connect it using a Lambda Function. The role assigned to lambda function already has access to msk.
Getting this error for the sasl_ssl

@phoenixx1 your Lambda Function is not running the code from this branch.
That error message is generated here, and displays the list of valid sasl_mechanisms defined here.
You'll notice that AWS_MSK_IAM is missing in your list.
To access the features in this PR you should install kafka-python from the following URL:
https://github.com/mattoberle/kafka-python/archive/7ff323727d99e0c33a68423300e7f88a9cf3f830.tar.gz
@mattoberle Tried using the package url provided by you, still facing same issue. (Created new venv to check) Do you have any sample code where you have used the same, so I can refer if anything is missing from mine. Will be really helpful thanks.
@mattoberle The issue I have is that my boto session needs to assume a role since I use a cross-account AWS MSK. This is not supported, since the basic boto session is created inside of the library. It can be forked and changed, so that session is now passed as an argument, but was wondering if there is any other way to do it, that I missed?
@mattoberle The issue I have is that my boto session needs to assume a role since I use a cross-account AWS MSK. This is not supported, since the basic boto session is created inside of the library. It can be forked and changed, so that session is now passed as an argument, but was wondering if there is any other way to do it, that I missed?
I did not account for this case but it seems worth supporting.
@mattoberle Apologies for the long comment :)
We are using changes from this PR to run our producer/consumers.
To connect to MSK, we have created an IAM user and set these IAM user credentials in the environment variable: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. This setup works but almost every 12 hours we get the following error in consumer during poll() operation:
File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py", line 657, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py", line 679, in _poll_once
self._coordinator.poll()
File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/coordinator/consumer.py", line 270, in poll
self.ensure_coordinator_ready()
File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/coordinator/base.py", line 269, in ensure_coordinator_ready
raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.GroupAuthorizationFailedError: [Error 30] GroupAuthorizationFailedError: some_consumer
Found below error in kafka-authorizer.log
denied to perform DESCRIBE on resource some_consumer of type GROUP during API call FindCoordinator because CLIENT_NOT_AUTHENTICATED (kafka.authorizer.logger)"
I guess the boot session token lasts for 12 hours then it is refreshed. Could you help here to understand what is happening?
Could you please help here?
@bkatwal I think you are describing what I attempted to explain here: https://github.com/dpkp/kafka-python/pull/2255#issuecomment-902075097
The code in this PR doesn't include anything to actively refresh expiring credentials (although I'd expect that the credentials get refreshed during the exception handling).
Full transparency, I'm no longer using kafka-pythonor MSK auth.
There is (understandably) a lack of appetite to incorporate a proprietary authentication scheme into the more active libraries backed by librdkafka.
@mattoberle can you share what you use as an alternative of kafka-python?
@bkatwal and @mattoberle I am a bit late on the topic, but I did some tests using a role that my user needs to assume in order to interact with the MSK cluster. The assumption of the role as you know is only temporary and the session token needs to be refreshed every N seconds.
If you look carefully in the boto3 documentation, you'll see this. This means that if you configure your ~/.aws/config as:
[default]
region = eu-central-1
source_profile = msk-programmatic-user
role_arn = arn:aws:iam::11111111111:role/msk-client
duration_seconds = 900
with the ~/.aws/credentials as:
[msk-programmatic-user]
aws_access_key_id = AKIA...
aws_secret_access_key = E532..
boto3 will know that you want to use the msk-client role and it will take care of refreshing credentials under the hood for you. Without you having to handle any try..except and refresh the token via a i.e sts.assume_role() logic.
Hope this will help!
Hello, this looks pretty solid, however it's been almost a year since it first came about. Is there a roadmap regarding this being merged in and released?
Hi @mattoberle , did you get any interest from @dpkp or the maintainers?
I am interested to pick this up to add in IAM role and creds refresh but would like to check if the maintainers are interested to get this feature merged. I do have use cases for MSK IAM role authentication.
+1 to get this merged
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Has anyone encountered this error? I have passed the credentials in environmental variables.
@azizulwahid With credentials, you need to set AWS_DEFAULT_REGION. Try adding AWS_DEFAULT_REGION environment variable or setting it in your compute/container :) Set the region where your MSK cluster sits.
@azizulwahid Try setting AWS_DEFAULT_REGION environment variable :) Set the region where your MSK cluster sits.
@bkatwal let me try that real quick
@bkatwal I still got the same error...
@bkatwal I still got the same error...
Could you send the full stacktrace?
Hi @mattoberle , did you get any interest from @dpkp or the maintainers? I am interested to pick this up to add in IAM role and creds refresh but would like to check if the maintainers are interested to get this feature merged. I do have use cases for MSK IAM role authentication.
Hi @mattoberle @jeffwidman - would we consider this? Happy to contribute to the code to add in IAM role and creds refresh.
@bkatwal I still got the same error...
Could you send the full stacktrace?
@bkatwal I got this working already. The issue was assume role - I have added another block of code to support that. Thanks!
how do you guys use this change?
- Do you build your own python package with this code?
- OR monkey-patching?
- sth else?
Thank you
@mattoberle and @dpkp - I am curious that outside of the outstanding TODOs around docs and Test config verification / auth method, would there be any other blockers to getting this integrated?
Hi @mattoberle , did you get any interest from @dpkp or the maintainers? I am interested to pick this up to add in IAM role and creds refresh but would like to check if the maintainers are interested to get this feature merged. I do have use cases for MSK IAM role authentication.
Hi @mattoberle @jeffwidman - would we consider this? Happy to contribute to the code to add in IAM role and creds refresh.
Asking essentially the same thing @joshuachong above really :)
From what I understand, this project is looking for maintainers and acceptance of new features is unlikely at this point in time. This library was last published in 2020, something to consider if you can foresee wanting to use newer broker features. https://github.com/dpkp/kafka-python/issues/2290#issuecomment-1009133967
I'm aware of an effort to integrate IAM into librdkafka here:
https://github.com/UrbanCompass/librdkafka
A wheel of confluent-kafka-python can be built using that fork, but you'll end up in the same situation (maintaining a fork yourself).
The official line is that IAM support will not be integrated into librdkafka because vendor-specific authentication mechanisms don't belong in the library.
Amazon only provides support for JVM-based clients: https://github.com/aws/aws-msk-iam-auth
I've left this PR open because it seems like some people have found it to be a helpful reference. To be clear though, I am no longer using this library or the UrbanCompass fork of librdkafka. I am also not a maintainer of this library.
Possible paths forward for those that wish to use this:
- Fork and comment if you plan to publicly support this feature-- we can point people to that fork from this PR discussion.
- Reach out about becoming a maintainer (see first link in this comment).
I know this isn't the update that anyone is looking for, but I just want to adjust the expectations for anyone looking for a timeline.
Any news on the merge timeline?