confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Trying to connect to MSK With SASL_SSL

Open Vipinsatti opened this issue 2 years ago • 4 comments

Description

When trying to connect to MSK AWS getting below error %3|1686828089.194|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://kafka.us-east-2]: sasl_ssl://.amazonaws.com:9196/bootstrap: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker handle destroyed (after 0ms in state DOWN)

How to reproduce

bootstrap_servers = ".amazonaws.com:9196,8.amazonaws.com:9196,b.amazonaws.com:9196"

SASL username and password for authentication

sasl_username = 'test' sasl_password = 'test'

SASL mechanism (SCRAM-SHA-256 or SCRAM-SHA-512) and security protocol

sasl_mechanism = 'SCRAM-SHA-512' security_protocol = 'SASL_SSL'

Additional Kafka configuration parameters

kafka_config = { 'bootstrap.servers': bootka_config = { 'bootstrap.servers': bootstrap_servers, 'security.protocol': security_protocol, 'sasl.mechanism': sasl_mechanism, 'sasl.username': sasl_username, 'sasl.password': sasl_password, 'group.id': '1', # Specify a unique consumer group ID # 'auto.offset.reset': 'earliest', # Start consuming from the beginning of the topic # 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required', }

Create a Kafka consumer with the provided configuration

consumer = Consumer(kafka_config)strap_servers, 'security.protocol': security_protocol, 'sasl.mechanism': sasl_mechanism, 'sasl.username': sasl_username, 'sasl.password': sasl_password, 'group.id': '1', # Specify a unique consumer group ID # 'auto.offset.reset': 'earliest', # Start consuming from the beginning of the topic # 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required', }

Create a Kafka consumer with the provided configuration

consumer = Consumer(kafka_config)

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [ ] 2.8.1:
  • [ ] Client configuration: {...}
  • [ ] Operating system:
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue
  • [x] confluent-kafka 2.1.1

Vipinsatti avatar Jun 15 '23 13:06 Vipinsatti

bootstrap.servers doesn't look correct. Check them.

pranavrth avatar Jun 30 '23 11:06 pranavrth

I was able to connect with the following - note that you will need to change the following values: bootstrap_servers, sasl.username and sasl.password.

#!/usr/bin/env python3

from confluent_kafka import Producer
from datetime import datetime
from time import strftime
import json

bootstrap_servers = 'b-1.msktestcluster.sjanui.c14.kafka.us-west-2.amazonaws.com:9096,b-2.msktestcluster.sjanui.c14.kafka.us-west-2.amazonaws.com:9096'
producer = Producer({
  'bootstrap.servers': bootstrap_servers,
  'security.protocol': 'SASL_SSL',
  'sasl.username': 'alice',
  'sasl.password': 'alice',
  'sasl.mechanism': 'SCRAM-SHA-512'
})

data = {
    'message': 'hello world',
    'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}

producer.produce('test_topic', json.dumps(data).encode('utf-8'))

producer.flush()

colinbjohnson avatar Jul 25 '23 21:07 colinbjohnson

@Vipinsatti did the discussion threads here help solve your issue? If so we can mark this closed

nhaq-confluent avatar Feb 12 '24 23:02 nhaq-confluent