confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Consumer via Oauth and AAD Service Principle

Open Apoptose93 opened this issue 3 years ago • 1 comments

Description

I want to consume messages from eventhub using oauth authentification. I configured a service principal and get the access token via Azure Identity. However, I get following error when trying to consume:

%3|1647343502.496|FAIL|myservice#consumer-1| [thrd:sasl_ssl://evh-fnmz-dev-dlz.servicebus.windows.net:9093/bootstr]: sasl_ssl://evh-fnmz-dev-dlz.servicebus.windows.net:9093/bootstrap: SASL authentication error: Invalid tenant name (after 40ms in state AUTH_REQ)

I used the Tenant ID, Secret and ClientId referenced from the service principal in AAD and used them as environment variables AZURE_TENANT_ID=<Tenant Id> AZURE_CLIENT_ID=<Client Id> AZURE_CLIENT_SECRET=<Secret>

I successfully get the JWT Token via Azure Identity.

How to reproduce

The code looks like that:

from confluent_kafka import Consumer


from azure.identity import DefaultAzureCredential, EnvironmentCredential
import functools

credential = EnvironmentCredential()

def getToken(self):
        credential = EnvironmentCredential()
        token = credential.get_token("https://eventhubs.azure.net/.default" )
        return token

consumer = Consumer({
    "group.id":"$Default",
    "bootstrap.servers":'evh-<name>-dev-dlz.servicebus.windows.net:9093',
    "security.protocol":'SASL_SSL',
    'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
    "sasl.mechanism":'OAUTHBEARER',
    'auto.offset.reset': 'earliest',
    'client.id': '<client_id>',
    'oauth_cb': functools.partial(getToken)
}
    )
consumer.subscribe(topics=['<topic>'])
while True:
    msg = consumer.poll()
    print(msg)

Checklist

Please provide the following information:

  • [X ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): confluent_kafka.version=('1.8.2', 17302016) , confluent_kafka.libversion=('1.8.2', 17302271)
  • [X ] Apache Kafka broker version: Azure Eventhub
  • [ X] Client configuration: {...}
  • [ X] Operating system: Docker python:3.7-slim-buster
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

Apoptose93 avatar Mar 15 '22 11:03 Apoptose93

Hi @Apoptose93 , thanks for asking.

Looks this is not client related issue. The error is related to TENANT_ID for me, it can't be identified.

jliunyu avatar Mar 15 '22 21:03 jliunyu

I think there is no change required from our side on this. Closing this as there is no update for a long time from the user. Feel free to open it again if required.

pranavrth avatar Feb 27 '24 12:02 pranavrth