confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Confluent-Kafka-Python producing SSL handshake error

Open arjun180 opened this issue 4 years ago • 4 comments

Description

I'm trying to run a Kafka consumer using the confluent-kafka python package. I've been using this example : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py as reference. We have oauth setup. And this setup works using the command line.

%3|1626297590.465|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://<kakfa-server>.com:9093/bootstrap]: sasl_ssl://<kakfa-server>.com:9093/bootstrap: SSL handshake failed: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 40ms in state SSL_HANDSHAKE, 1 identical error(s) suppressed)

How to reproduce

def _get_token(config):
    """Note here value of config comes from sasl.oauthbearer.config below.
    It is not used in this example but you can put arbitrary values to
    configure how you can get the token (e.g. which token URL to use)
    """
    payload = {
        'grant_type': 'client_credentials',
        'scope': scopes
    }
    resp = requests.post(token_url,
                         auth=(client_id, client_secret),
                         data=payload)
    token = resp.json()
    return token['access_token'], time.time() + float(token['expires_in'])

def producer_config():
    logger = logging.getLogger(__name__)
    return {
        'bootstrap.servers': bootstrap_servers,
        'key.serializer': StringSerializer('utf_8'),
        'value.serializer': StringSerializer('utf_8'),
        'security.protocol': 'sasl_ssl',
        'sasl.mechanisms': 'OAUTHBEARER',
        # sasl.oauthbearer.config can be used to pass argument to your oauth_cb
        # It is not used in this example since we are passing all the arguments
        # from command line
        # 'sasl.oauthbearer.config': 'not-used',
        'oauth_cb': functools.partial(_get_token),
        'logger': logger,
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '<CA-CERT>',
        'ssl.certificate.location': '<CERT-LOCATION>',
        'ssl.key.location': '<KEY-LOCATION>'
    }

def delivery_report(err, msg):
    """
    Reports the failure or success of a message delivery.
    Args:
        err (KafkaError): The error that occurred on None on success.
        msg (Message): The message that was produced or failed.
    Note:
        In the delivery report callback the Message.key() and Message.value()
        will be the binary format as encoded by any configured Serializers and
        not the same object that was passed to produce().
        If you wish to pass the original object(s) for key and value to delivery
        report callback we recommend a bound callback or lambda where you pass
        the objects along.
    """
    if err is not None:
        print('Delivery failed for User record {}: {}'.format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))

def main():
    producer_conf = producer_config()
    delimiter = "|"
    producer = SerializingProducer(producer_conf)
    print('Producing records to topic {}. ^C to exit.'.format(topic))
    while True:
        # Serve on_delivery callbacks from previous calls to produce()
        producer.poll(0.0)
        try:
            msg_data = input(">")
            msg = msg_data.split(delimiter)
            if len(msg) == 2:
                producer.produce(topic=topic, key=msg[0], value=msg[1],
                                 on_delivery=delivery_report)
            else:
                producer.produce(topic=topic, value=msg[0],
                                 on_delivery=delivery_report)
        except KeyboardInterrupt:
            break
    print('\nFlushing {} records...'.format(len(producer)))
    producer.flush()

main()

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [ ] Apache Kafka broker version:
  • [ ] Client configuration: {...}
  • [ ] Operating system:
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

confluent-kafka-version : 1.7.0 librdkafka : 1.7.0

arjun180 avatar Jul 14 '21 22:07 arjun180

Hi @arjun180 , thanks for reporting the issue.

May I know which command did you use when the setup works?

jliunyu avatar Jul 15 '21 23:07 jliunyu

Hi @jliunyu - Thanks for getting back to me. So, quick update on this - the producer now works. I can send messages and there are no problems. However, I'm seeing a problem with my consumer - I don't receive any of the messages.

7|1626378712.862|BROKER|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1 %7|1626378712.862|BRKMAIN|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread %7|1626378712.862|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread %7|1626378712.862|BROKER|rdkafka#consumer-1| [thrd:app]: sasl_ssl://kafka0dev.abc.com:9093/bootstrap: Added new broker with NodeId -1 %7|1626378712.862|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.7.0 (0x10700ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STRIP STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2) %7|1626378712.862|BRKMAIN|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Enter main broker thread %7|1626378712.862|CONNECT|rdkafka#consumer-1| [thrd:main]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s)) %7|1626378712.863|CONNECT|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Received CONNECT op %7|1626378712.863|STATE|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT %7|1626378712.863|CONNECT|rdkafka#consumer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: coordinator query Subscription complete %7|1626378713.550|CONNECT|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: broker in state TRY_CONNECT connecting %7|1626378713.550|STATE|rdkafka#consumer-1| [thrd:sasl_ssl:///kafka0dev.abc.com:9093/bootstrap]: sasl_ssl://kafka0dev.abc.com:9093/bootstrap: Broker changed state TRY_CONNECT -> CONNECT %7|1626378713.569|CONNECT|rdkafka#consumer-1| [thrd:sasl_ssl://kafka0dev.abc.com:9093/bootstrap]: sasl_ssl:///kafka0dev.abc.com:9093/bootstrap: Connecting to ipv4#52.38.99.178:9093 (sasl_ssl) with socket 13 %7|1626378713.864|CONNECT|rdkafka#consumer-1| [thrd:main]: Cluster connection already in progress: coordinator query %7|1626378713.864|CONNECT|rdkafka#consumer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection

The code for the consumer looks like this :

def print_assignment(consumer, partitions):
  print('Assignment:', partitions)


def _get_token(config):
    """Note here value of config comes from sasl.oauthbearer.config below.
    It is not used in this example but you can put arbitrary values to
    configure how you can get the token (e.g. which token URL to use)
    """
    payload = {
        'grant_type': 'client_credentials',
        'scope': scopes
    }
    resp = requests.post(token_url,
                         auth=(client_id, client_secret),
                         data=payload)
    token = resp.json()
    return token['access_token'], time.time() + float(token['expires_in'])


def main():
    conf = {
        'bootstrap.servers': broker, 'group.id': group,
        'session.timeout.ms': 6000,
        'debug': 'topic',
        'auto.offset.reset': 'earliest',
        'oauth_cb': functools.partial(_get_token),
        'sasl.mechanisms': 'OAUTHBEARER',
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '<CA-CERT>',
        'ssl.certificate.location': '<CERT-LOCATION>',
        'ssl.key.location': '<KEY-LOCATION>'
    }
    }
    c = Consumer(conf)
    c.subscribe([topic], on_assign=print_assignment)
    print("Subscription complete")
    try:
        while True:
            msg = c.poll(timeout=1.0)
            print("Polling messages")
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')
    finally:
        # Close down consumer to commit final offsets.
        c.close()

I was trying to follow the issue on this git link : https://github.com/edenhill/librdkafka/issues/2725 but there was no resolution.

For the command line, I used this :

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka0.abc.com:9093 --topic $TOPIC_NAME --consumer.config client-ssl.properties --from-beginning

with properties file being

ssl.truststore.location=<TRUSTSTORE>
ssl.truststore.password=<PASSWORD>
ssl.endpoint.identification.algorithm=
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.client.id="<CLIENT_ID>" \
oauth.client.secret="<CLIENT_SECRET>" \
oauth.scope="<SCOPE>" \
oauth.token.endpoint.uri="<TOKEN_URL>" ;

arjun180 avatar Jul 15 '21 23:07 arjun180

From the above logs, looks the consumer doesn't connect to the broker successfully.

For your consumer config, the clients side config and server side config are mixed up.

You mentioned that the producer works already, the authentication part config in consumer side would be the same with producer.

jliunyu avatar Mar 25 '22 22:03 jliunyu

@arjun180 can this be closed?

nhaq-confluent avatar Mar 05 '24 22:03 nhaq-confluent