librdkafka
librdkafka copied to clipboard
Consumer hangs without ever calling oauth_cb
Description
The consumer hangs without ever calling oauth_cb
if, after creating a consumer, one immediately calls list_topics
. Inserting poll(0)
seems to fix it. This might be a duplicate of #3753.
How to reproduce
Sample code here:
#!/usr/bin/env python
from uuid import uuid4
import confluent_kafka as ck
from authlib.integrations.requests_client import OAuth2Session
def set_oauth_cb(config):
"""Implement client support for KIP-768 OpenID Connect.
Apache Kafka 3.1.0 supports authentication using OpenID Client Credentials.
Native support for Python is coming in the next release of librdkafka
(version 1.9.0). Meanwhile, this is a pure Python implementation of the
refresh token callback.
"""
if config.pop('sasl.oauthbearer.method', None) != 'oidc':
return
client_id = config.pop('sasl.oauthbearer.client.id')
client_secret = config.pop('sasl.oauthbearer.client.secret')
scope = config.pop('sasl.oauthbearer.scope', None)
token_endpoint = config.pop('sasl.oauthbearer.token.endpoint.url')
session = OAuth2Session(client_id, client_secret, scope=scope)
def oauth_cb(*_, **__):
token = session.fetch_token(
token_endpoint, grant_type='client_credentials')
return token['access_token'], token['expires_at']
config['oauth_cb'] = oauth_cb
config = {
'bootstrap.servers': '...',
'security.protocol': 'sasl_ssl',
'sasl.mechanisms': 'OAUTHBEARER',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': '...',
'sasl.oauthbearer.client.secret': '...',
'sasl.oauthbearer.token.endpoint.url': '...',
'group.id': str(uuid4()),
}
set_oauth_cb(config)
consumer = ck.Consumer(config)
consumer.poll(0) # hangs without ever running oauth_cb unless this is here
for topic in consumer.list_topics().topics:
print(topic)
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag): d50099f9d8f0c8a62edca91bebc107cf9a3ec8a8
- [x] Apache Kafka version: 3.1.0
- [x] librdkafka client configuration: (see sample code above)
- [x] Operating system: macOS Monterey
- [ ] Provide logs (with
debug=..
as necessary) from librdkafka - [ ] Provide broker log excerpts
- [ ] Critical issue
Yes, you need to explicitly call poll()
once after creating the client to trigger the oauth callback - it can't be triggered from client instantiation since the caller might not have a reasonable context set up yet.
v1.9.0 introduced a SASL background callback that can be used for this purpose, but it is not yet exposed in the Python client: https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L3308
If the auth fails during the first call to poll, will it be automatically retried?
Yes, a new token refresh callback will be triggered in 10 seconds.
In what commit was this fixed?
My two previous comments tried to describe what needs to be done by the application, either:
- call poll(0) after instantiating the client, or
- set up a background SASL callback so that it's triggered from the background.
We can't trigger the refresh callback from inside rd_kafka_new() because the caller might not have been fully set up, and we can't trigger one from a background thread after rd_kafka_new() has returned for that same reason.
So, the application will need to do one of the two things mentioned above.
Is there a Python wrapper method for that rd_kafka_conf_enable_sasl_queue
function?
I don't believe there is, no.