confluent-kafka-python
confluent-kafka-python copied to clipboard
Confluent-Kafka-Python producing SSL handshake error
Description
I'm trying to run a Kafka consumer using the confluent-kafka python package. I've been using this example : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py as reference. We have oauth setup. And this setup works using the command line.
%3|1626297590.465|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://<kakfa-server>.com:9093/bootstrap]: sasl_ssl://<kakfa-server>.com:9093/bootstrap: SSL handshake failed: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 40ms in state SSL_HANDSHAKE, 1 identical error(s) suppressed)
How to reproduce
def _get_token(config):
"""Note here value of config comes from sasl.oauthbearer.config below.
It is not used in this example but you can put arbitrary values to
configure how you can get the token (e.g. which token URL to use)
"""
payload = {
'grant_type': 'client_credentials',
'scope': scopes
}
resp = requests.post(token_url,
auth=(client_id, client_secret),
data=payload)
token = resp.json()
return token['access_token'], time.time() + float(token['expires_in'])
def producer_config():
logger = logging.getLogger(__name__)
return {
'bootstrap.servers': bootstrap_servers,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': StringSerializer('utf_8'),
'security.protocol': 'sasl_ssl',
'sasl.mechanisms': 'OAUTHBEARER',
# sasl.oauthbearer.config can be used to pass argument to your oauth_cb
# It is not used in this example since we are passing all the arguments
# from command line
# 'sasl.oauthbearer.config': 'not-used',
'oauth_cb': functools.partial(_get_token),
'logger': logger,
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '<CA-CERT>',
'ssl.certificate.location': '<CERT-LOCATION>',
'ssl.key.location': '<KEY-LOCATION>'
}
def delivery_report(err, msg):
"""
Reports the failure or success of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
Note:
In the delivery report callback the Message.key() and Message.value()
will be the binary format as encoded by any configured Serializers and
not the same object that was passed to produce().
If you wish to pass the original object(s) for key and value to delivery
report callback we recommend a bound callback or lambda where you pass
the objects along.
"""
if err is not None:
print('Delivery failed for User record {}: {}'.format(msg.key(), err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
def main():
producer_conf = producer_config()
delimiter = "|"
producer = SerializingProducer(producer_conf)
print('Producing records to topic {}. ^C to exit.'.format(topic))
while True:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
try:
msg_data = input(">")
msg = msg_data.split(delimiter)
if len(msg) == 2:
producer.produce(topic=topic, key=msg[0], value=msg[1],
on_delivery=delivery_report)
else:
producer.produce(topic=topic, value=msg[0],
on_delivery=delivery_report)
except KeyboardInterrupt:
break
print('\nFlushing {} records...'.format(len(producer)))
producer.flush()
main()
Checklist
Please provide the following information:
- [ ] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): - [ ] Apache Kafka broker version:
- [ ] Client configuration:
{...} - [ ] Operating system:
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
confluent-kafka-version : 1.7.0 librdkafka : 1.7.0
Hi @arjun180 , thanks for reporting the issue.
May I know which command did you use when the setup works?
Hi @jliunyu - Thanks for getting back to me. So, quick update on this - the producer now works. I can send messages and there are no problems. However, I'm seeing a problem with my consumer - I don't receive any of the messages.
7|1626378712.862|BROKER|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1 %7|1626378712.862|BRKMAIN|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread %7|1626378712.862|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread %7|1626378712.862|BROKER|rdkafka#consumer-1| [thrd:app]: sasl_ssl://kafka0dev.abc.com:9093/bootstrap: Added new broker with NodeId -1 %7|1626378712.862|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.7.0 (0x10700ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STRIP STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2) %7|1626378712.862|BRKMAIN|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Enter main broker thread %7|1626378712.862|CONNECT|rdkafka#consumer-1| [thrd:main]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s)) %7|1626378712.863|CONNECT|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Received CONNECT op %7|1626378712.863|STATE|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT %7|1626378712.863|CONNECT|rdkafka#consumer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: coordinator query Subscription complete %7|1626378713.550|CONNECT|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: broker in state TRY_CONNECT connecting %7|1626378713.550|STATE|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl://kafka0dev.abc.com:9093/bootstrap: Broker changed state TRY_CONNECT -> CONNECT %7|1626378713.569|CONNECT|rdkafka#consumer-1| [thrd:sasl_ssl://kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Connecting to ipv4#52.38.99.178:9093 (sasl_ssl) with socket 13 %7|1626378713.864|CONNECT|rdkafka#consumer-1| [thrd:main]: Cluster connection already in progress: coordinator query %7|1626378713.864|CONNECT|rdkafka#consumer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
The code for the consumer looks like this :
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
def _get_token(config):
"""Note here value of config comes from sasl.oauthbearer.config below.
It is not used in this example but you can put arbitrary values to
configure how you can get the token (e.g. which token URL to use)
"""
payload = {
'grant_type': 'client_credentials',
'scope': scopes
}
resp = requests.post(token_url,
auth=(client_id, client_secret),
data=payload)
token = resp.json()
return token['access_token'], time.time() + float(token['expires_in'])
def main():
conf = {
'bootstrap.servers': broker, 'group.id': group,
'session.timeout.ms': 6000,
'debug': 'topic',
'auto.offset.reset': 'earliest',
'oauth_cb': functools.partial(_get_token),
'sasl.mechanisms': 'OAUTHBEARER',
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '<CA-CERT>',
'ssl.certificate.location': '<CERT-LOCATION>',
'ssl.key.location': '<KEY-LOCATION>'
}
}
c = Consumer(conf)
c.subscribe([topic], on_assign=print_assignment)
print("Subscription complete")
try:
while True:
msg = c.poll(timeout=1.0)
print("Polling messages")
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()
I was trying to follow the issue on this git link : https://github.com/edenhill/librdkafka/issues/2725 but there was no resolution.
For the command line, I used this :
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka0.abc.com:9093 --topic $TOPIC_NAME --consumer.config client-ssl.properties --from-beginning
with properties file being
ssl.truststore.location=<TRUSTSTORE>
ssl.truststore.password=<PASSWORD>
ssl.endpoint.identification.algorithm=
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.client.id="<CLIENT_ID>" \
oauth.client.secret="<CLIENT_SECRET>" \
oauth.scope="<SCOPE>" \
oauth.token.endpoint.uri="<TOKEN_URL>" ;
From the above logs, looks the consumer doesn't connect to the broker successfully.
For your consumer config, the clients side config and server side config are mixed up.
You mentioned that the producer works already, the authentication part config in consumer side would be the same with producer.
@arjun180 can this be closed?