confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

[HELP] - Kafka Producer IAM Role Authentication Fails After Idle Period with MSK

Open Sivakajan-Galaxy opened this issue 8 months ago • 5 comments

I need to create a Kafka producer using the confluent_kafka.Producer that works with AWS MSK using IAM role-based authentication. To achieve this, I have developed a working solution as outlined below.

import json
import logging
from confluent_kafka import Producer
from config.settings import KAFKA_BOOTSTRAP_SERVERS, REGION
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import socket

logger = logging.getLogger(__name__)


class KafkaProducerService:
    def __init__(self):
        self.producer = Producer(
            {
                "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
                "acks": "all",
                "security.protocol": "SASL_SSL",
                "sasl.mechanism": "OAUTHBEARER",
                "oauth_cb": self.oauth_cb,
                "client.id": socket.gethostname()
            }
        )

    def oauth_cb(self, oauth_config):
        auth_token, exp_ms = MSKAuthTokenProvider.generate_auth_token(REGION)
        return auth_token, exp_ms/1000

    def produce_message(self, topic, message):
        try:
            self.producer.produce(topic=topic, value=json.dumps(message).encode("utf-8"))
            self.producer.flush()
            self.producer.poll(10)
            logger.info(f"Produced message to {topic}: {message}")
        except Exception as e:
            logger.error(f"Failed to send message to {topic}: {str(e)}")
            raise


kafka_producer_service = KafkaProducerService()

The producer is able to establish a healthy connection and successfully send messages to the consumer. However, if the connection remains idle for a few minutes, it starts logging MSK-related connection errors repeatedly.

|1743698633.362|FAIL|api-service-29457134-mr5tm#producer-2| [thrd:sasl_ssl://b-1.*******devmsk.dc43-1948-12.kafka.us-east-1.a]: sasl_ssl://b-1.********devmsk.dc256562362.kafka.us-east-1.amazonaws.com:9098/1: SASL authentication error: [20234d-243-49fb-92e8-4]2324234: Access denied (after 235ms in state AUTH_REQ)

The error logs keep appearing repeatedly while the connection is idle. However, as soon as a new message is sent, it reaches the consumer successfully and the error logs stop. Once the connection becomes idle again, the same errors start appearing after a short timeout.

Can anyone help me to solve this, and able to give a best solution for the case.

Sivakajan-Galaxy avatar Apr 04 '25 03:04 Sivakajan-Galaxy