confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Access denied error for IAM role based AWS MSK cluster while using confluent-kafka python library

Open balajibreddi opened this issue 1 year ago • 15 comments

Description

Here is the overview of the application, it consumes from the upstream kafka cluster processes the kafka message and produces it to the downstream kafka cluster using the confluent-kafka python library(2.3.0).

We have changed the authentication type from SASL/SCRAM to IAM role-based and to do that we have added a trust relationship and also given all access(MSK, Apache api's) to applications in the policy attached of ECS service, but still, we see Access denied errors.

The weird behaviour we see in the application logs is it doesn't throw errors while processing kafka messages but if it sits idle for around 5 hours its starts throwing these errors, if any messages come in then it will not throw errors for the next 5 hours. This behaviour is odd.

Error: %3|1714955097.400|FAIL|8b68559c-dbf7-401b-ac6c-807523ee37ee#producer-1| [thrd:sasl_ssl://b-3.clusteranme.stinjb.c7.kafka.region.]: sasl_ssl://b-3.clusteranme.stinjb.c7.region.amazonaws.com:9098/3: SASL authentication error: [6ad8e7d6-f5f0-41c9-930f-26cc577779ed]: Access denied (after 346ms in state AUTH_REQ)

FYI: To generate an auth token we are using the aws_msk_iam_sasl_signer library from AWS to generate a token based on region and passing it to oauth_cb config parameter of Producer.

How to reproduce

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 2.3.0
  • [ ] MSK Apache Kafka broker version: 3.5.1
  • [ ] Client configuration: {'security.protocol'="SASL_SSL", 'client.id'=str(uuid.uuid4()), 'bootstrap.servers'="b-1.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098,b- 2.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098,b-3.clustername.stinjb.c7.kafka.us-east-2.amazonaws.com:9098", 'sasl.mechanism'="OAUTHBEARER", 'acks'=1, 'oauth_cb'="Token from MSKAuthTokenProvider.generate_auth_token", 'compression.type'="gzip", 'reconnect.backoff.max.ms'=3000, 'retries'=3, 'request.timeout.ms'=15000)}`
  • [ ] Operating system: Linux/X86_64

balajibreddi avatar May 07 '24 08:05 balajibreddi

Same problem, trying to connect to a MSK serverless cluster, did you find any solution?

loigiorgio avatar Jul 25 '24 13:07 loigiorgio

Do you guys try to use .poll() method to keep connection alive?

liuliuOD avatar Aug 05 '24 14:08 liuliuOD

Did you find a solution? I'm having a similair problem using MSK.

fklezin avatar Aug 27 '24 11:08 fklezin

I was able to connect to a MSK Serverless cluster through my lambda function, my policies were wrong. I solved using the

  "Resource": [
      "*"
  ]

in the policy document instead of directly write the cluster ARN.

My python lambda function with confluence-kafka:

...
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic, KafkaException, KafkaError
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
...

def oauth_cb(oauth_config):
    auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(
        "us-east-1")
    # Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator
    # returns expiry in ms
    return auth_token, expiry_ms / 1000


kafka_producer = Producer({
    # "debug": "all",
    'bootstrap.servers': os.environ.get('KAFKA_BROKER_STR'),
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': oauth_cb
})

admin_client = AdminClient({
    # "debug": "all",
    'bootstrap.servers': os.environ.get('KAFKA_BROKER_STR'),
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': oauth_cb
})

def create_topic(topic_name=None, num_partitions=1, replication_factor=2):
    log.info(f"Trying to create topic {topic_name}")
    topic_list = [NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)]
    admin_client.poll(3)
    fs = admin_client.create_topics(topic_list)
    ...

it doesn´t work without admin_client.poll(3)

loigiorgio avatar Aug 27 '24 16:08 loigiorgio

Hi @balajibreddi @fklezin did you find any solution for this ? We are also facing the similar issue. Looks like the token isn’t getting refreshed

ShashidharC avatar Sep 19 '24 07:09 ShashidharC

I did what @loigiorgio suggested.

You need to call poll to authenticate: admin_client.poll(3)

It's described here: https://github.com/confluentinc/confluent-kafka-python/issues/1713#issuecomment-2268673955

fklezin avatar Sep 19 '24 07:09 fklezin

@fklezin We are able to authenticate to the MSK cluster and also able to produce the message. But after a while when the application is idle it says the authentication failure. Similar to what @balajibreddi described in the post

ShashidharC avatar Sep 19 '24 07:09 ShashidharC

@edenhill apologies for tagging you directly. Did you come across any error like this ?

ShashidharC avatar Sep 19 '24 07:09 ShashidharC

Can you share the logs? But you're probably looking for: "socket.keepalive.enable": True

fklezin avatar Sep 19 '24 07:09 fklezin