confluent-kafka-python
confluent-kafka-python copied to clipboard
Don't understand ssl.ca.pem format
Description
I'm working on using confluent kafka python client to consume using ssl. I have no issue with setting the path of cert to ssl.ca.location. However, if I tried to use 'ssl.ca.pem', it will report error like 'SSL handshake failed: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl)'.
How to reproduce
There is a cert file in my project my_cert.crt
with such structure:
-----BEGIN CERTIFICATE-----
first cert
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
another one
-----END CERTIFICATE-----
So, I'm trying to connect to Kafka with SASL_SSL. If I use ssl.ca.location way like:
conf["ssl.ca.location"] = os.path.join(cur_path, "my_cert.crt")
p = Producer(**conf)
everything goes without a doubt, but I can't find a way how to pass cert to ssl.ca.pem. I've tried things like:
with open("my_cert.crt", "b") as f:
b_cert = f.read()
conf["ssl.ca.pem"] = b_cert
p = Producer(**conf)
####
#OR#
####
with open("my_cert.crt", "r", encoding='utf-8') as f:
encoded_cert = base64.b64encode(f.read().encode("UTF-8"))
decoded_cert = base64.b64decode(encoded_cert).decode('utf-8')
conf["ssl.ca.pem"] = decoded_cert
p = Producer(**conf)
Feels like I'm missing something and need an assistance. Thank you!
Checklist
Please provide the following information:
- [confluent-kafka-python:('1.9.2', 17367552); librdkafka: ('1.9.2', 17367807) ]
- [x] Apache Kafka broker version:
- [ ] Client configuration:
{...}
- [ ] Operating system:
- [ ] Provide client logs (with
'debug': '..'
as necessary): -
1673960934.271|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://url:port/bootstrap]: sasl_ssl://url:port/bootstrap: SSL handshake failed: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package) (after 3ms in state SSL_HANDSHAKE)
- [ ] Provide broker log excerpts
- [ ] Critical issue
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", None)
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "localhost:9092")
SCHEMA_REGISTRY = os.getenv("SCHEMA_REGISTRY", "http://localhost:9081")
KAFKA_SECURITY_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", None) # SASL_SSL
KAFKA_SASL_MECHANISM = os.getenv("KAFKA_SASL_MECHANISM", None) # SCRAM-SHA-256
KAFKA_SASL_USERNAME = os.getenv("KAFKA_SASL_USERNAME", None)
KAFKA_SASL_PASSWORD = os.getenv("KAFKA_SASL_PASSWORD", None)
KAFKA_SSL_CA_LOCATION = os.getenv("KAFKA_SSL_CA_LOCATION", None) # "../conf/ca.pem"
AVRO_VALUE_SCHEMA_PATH = "avro/aaaaa.avsc"
key_schema_str = """{"type": "string"}"""
value_schema_str = open(AVRO_VALUE_SCHEMA_PATH, "r").read()
schema_registry_conf = {"url": SCHEMA_REGISTRY}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_conf = {"auto.register.schemas": False}
key_avro_serializer = AvroSerializer(schema_str=key_schema_str,
schema_registry_client=schema_registry_client,
conf=avro_conf)
value_avro_serializer = AvroSerializer(schema_str=value_schema_str,
schema_registry_client=schema_registry_client,
conf=avro_conf)
producer_conf = {"bootstrap.servers": KAFKA_BROKER,
"key.serializer": key_avro_serializer,
"value.serializer": value_avro_serializer,
"partitioner": "murmur2_random" # equivalent of the default java partitioner
}
if KAFKA_SECURITY_PROTOCOL:
producer_conf["security.protocol"] = KAFKA_SECURITY_PROTOCOL
producer_conf["sasl.mechanism"] = KAFKA_SASL_MECHANISM
producer_conf["sasl.username"] = KAFKA_SASL_USERNAME
producer_conf["sasl.password"] = KAFKA_SASL_PASSWORD
producer_conf["ssl.ca.location"] = KAFKA_SSL_CA_LOCATION
client = SerializingProducer(producer_conf)
@raphaelauv i get that. My issue is more about passing CERT as string. So, I've achieved connecting SASL_SSL kafka passing cert location, but I would prefer to pass whole certificate as a string