confluent-kafka-python
confluent-kafka-python copied to clipboard
Python SchemaRegistryClient failed to connect to SchemaRegistry Server
Description
Hi i have a Schema registry server running with the following server.properties
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The address the socket server listens on.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=https://0.0.0.0:8081
# Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the leader schema registry instance and for storing the data for
# registered schemas.
...
ssl.keystore.location=/etc/schema-registry/ssl/schema.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/etc/schema-registry/ssl/schema.truststore.jks
ssl.truststore.password=password
ssl.client.auth=false
ssl.client.authentication=NONE
# If true, API requests that fail will include extra debugging information, including stack traces
debug=true
metadata.encoder.secret=REPLACE_ME_WITH_HIGH_ENTROPY_STRING
resource.extension.class=io.confluent.dekregistry.DekRegistryResourceExtension
inter.instance.protocol=https
here is my test Python script
from flask import Flask, request, jsonify
from flasgger import Swagger
from flask_cors import CORS
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from utils import get_schema_from_schema_registry, delivery_report
import struct
#-------------------------------------------------
# Configuration of Kafka Producer
schema_registry_url = 'https://schemaregistryhostname:8081'
kafka_topic = 'ShipmentReadyForPacking'
schema_registry_subject = f"{kafka_topic}-value"
sr_client:SchemaRegistryClient=SchemaRegistryClient({
'url':schema_registry_url,
'ssl.ca.location':'/opt/converter-api/cert.pem'
})
print(sr_client.get_subjects())
#-------------------------------------------------
I get the following logs
Traceback (most recent call last):
File "/opt/converter-api/app.py", line 25, in <module>
print(sr_client.get_subjects())
File "/usr/local/lib64/python3.9/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 521, in get_subjects
return self._rest_client.get('subjects')
File "/usr/local/lib64/python3.9/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 125, in get
return self.send_request(url, method='GET', query=query)
File "/usr/local/lib64/python3.9/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 168, in send_request
response = self.session.request(
File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 589, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 703, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.9/site-packages/requests/adapters.py", line 698, in send
raise SSLError(e, request=request)
requests.exceptions.SSLError: HTTPSConnectionPool(host='schemaregistryhostname', port=8081): Max retries exceeded with url: /subjects (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1129)')))
The problem is that when trying to do it with simple curl it works :/
curl --cacert ./cert.pem https://schemaregistryhostname:8081/subjects
How to reproduce
confluent-schema-registry version 7.7.1 and python package confluent-kafka is version 2.5.3
Checklist
Please provide the following information:
- [X] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): - [X] Apache Kafka broker version:
- [X] Client configuration:
{...} - [ ] Operating system:
- [X] Provide client logs (with
'debug': '..'as necessary) - [X] Provide broker log excerpts
- [X] Critical issue
I have seen the same exact error when i try to connect o my brokers with the confluent Schema Producer class. I think im definitly zapping something On broker logs im able to see the following error :
javax.net.ssl|ERROR|32|data-plane-kafka-network-thread-0-ListenerName(SSL)-SSL-5|2024-11-15 16:17:35.292 CET|TransportContext.java:352|Fatal (HANDSHAKE_FAILURE): Insufficient buffer remaining for AEAD cipher fragment (2). Needs to be more than tag size (16) (
"throwable" : {
javax.crypto.BadPaddingException: Insufficient buffer remaining for AEAD cipher fragment (2). Needs to be more than tag size (16)
at java.base/sun.security.ssl.SSLCipher$T13GcmReadCipherGenerator$GcmReadCipher.decrypt(SSLCipher.java:1894)
at java.base/sun.security.ssl.SSLEngineInputRecord.decodeInputRecord(SSLEngineInputRecord.java:240)
at java.base/sun.security.ssl.SSLEngineInputRecord.decode(SSLEngineInputRecord.java:197)
at java.base/sun.security.ssl.SSLEngineInputRecord.decode(SSLEngineInputRecord.java:160)
at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:111)
at java.base/sun.security.ssl.SSLEngineImpl.decode(SSLEngineImpl.java:681)
at java.base/sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:636)
at java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:454)
at java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:433)
at java.base/javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:637)
at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:527)
at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:381)
at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:301)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
This suggests that the Python client is not handling the certificate in the same way as curl since you specified the ca on curl. The error on your logs indicate you have a self signed certificate verification failure
requests.exceptions.SSLError: HTTPSConnectionPool(host='schemaregistryhostname', port=8081):
Max retries exceeded with url: /subjects (Caused by SSLError(SSLCertVerificationError(1,
'[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1129)')))
could you please share the result if you disable hostname verification and client auth
'ssl.ca.location':'/opt/converter-api/cert.pem',
'enable.ssl.certificate.verification': False
there might be a different option for disabling certificate verification that is available on the confluent producer page.
This will allow the Python client to bypass SSL verification, similar to how curl works when specifying --cacert. However, this isn't a long-term solution for production systems as it bypasses certificate validation.
There is no enable.ssl.certificate.verification properties for my version for the package 2.5.3; resulting in the following error
ValueError: Unrecognized properties: enable.ssl.certificate.verification
You're absolutely correct, and I apologize for the confusion with a different library but I mentioned there is probably a different option for it based on how the library is programmed..
After reviewing the relevant documentation and code at https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/_modules/confluent_kafka/schema_registry/schema_registry_client.html), theenable.ssl.certificate.verificationoption is not available in this library.
My understanding is they all built on librdkafka which clearly has enable enable.ssl.certificate.verification option
https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
I dont claim i m kafka expert but i was trying to connect the dots together.
Additionally can yuo try to use the following and let me know if it works for you .. this is basd on my analysis of the client code...
kafka_topic = 'ShipmentReadyForPacking'
schema_registry_subject = f"{kafka_topic}-value"
sr_client: SchemaRegistryClient = SchemaRegistryClient({
'url': schema_registry_url,
'ssl.ca.location': '/opt/converter-api/cert.pem'
})
sr_client._rest_client.session.verify = False # <<<< --- DISABLE VERIFICATION
print(sr_client.get_subjects())``
I guess a pull request for those options wont hurt would be may be good thing to have
i would also verify... which i assume you did .. that you have all CA certificates to include the intermediates in the trust store and they all got imported . So in other words make sure also your schema.keystore.jks file is set up correctly and contains the private key and the full certificate chain for the client.
The schema.keystore.jks is properly set up. If it weren’t, I wouldn’t have been able to successfully run the curl command with the --cacert option.
While setting sr_client._rest_client.session.verify = False does fix the issue, it’s not the solution I’m aiming for. My goal is for the SchemaRegistryClient to properly verify the server using the certificate provided via the ssl.ca.location configuration.
In this case, it feels like the behavior is equivalent to providing ssl.ca.location: ''.
The schema.keystore.jks is properly set up. If it weren’t, I wouldn’t have been able to successfully run the curl command with the --cacert option.
While setting sr_client._rest_client.session.verify = False does fix the issue, it’s not the solution I’m aiming for. My goal is for the SchemaRegistryClient to properly verify the server using the certificate provided via the ssl.ca.location configuration.
In this case, it feels like the behavior is equivalent to providing ssl.ca.location: ''.
There are differences between how the library clients and curl in how they handle and process SSL verification and certificate chains. Limitations exist and the libraries would probably be more strict. I think looking more at the ca and keystore files wont hurt. I wanted to make sure to explore all possible angles ..
Another possibility that the SchemaRegistryClient is either not passing or processing the ssl.ca.location configuration correctly because of a bug or a problem on how the library handles or processes the current ca configurations.
Doing what you said i just get SSL error because it try to verify cserver certificate but sr_client._rest_client.session.verify is not a good option to put the CA CERT
Doing what you said i just get SSL error because it try to verify cserver certificate but sr_client._rest_client.session.verify is not a good option to put the CA CERT
You're right ..I initially thought the class had this method to handle this directly as well but I was mistaken. As a workaround I submitted a pull request to add an ssl.verify option for better flexibility.
Hi @Jay-boo , i am seeing the same issue. do you have a resolution for this.
No 😭😭😭
Apologies this wasn't resolved when posted. What version of the python were you using? And how were you signing the self-signed certificate? https://github.com/encode/httpx/issues/507 has some examples of self-signing that work with httpx (which schema registry is using) in case that is helpful in resolving.