aiokafka
aiokafka copied to clipboard
Trying to consume with oauthbear but issues with AbstractTokenProvider maybee
Hi,
python-kafka simply works with oauthbearer. With aiokafka there are zero examples I am flying blind.
Did I impelent CustomTokenProvider correct? the create_ssl_context() function is what i use in python-kafka
Below is the error I get:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 376, in _on_read_task_error
read_task.result()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 512, in _read
resp = await reader.readexactly(4)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py", line 677, in readexactly
raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 4 expected bytes
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/Documents/workspace/hedge-project/nasdaqstreamer/ai.py", line 109, in <module>
loop.run_until_complete(consume())
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/Users//Documents/workspace/hedge-project/nasdaqstreamer/ai.py", line 102, in consume
await consumer.start()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 341, in start
await self._client.bootstrap()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/client.py", line 215, in bootstrap
version_hint=version_hint)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 97, in create_conn
await conn.connect()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 235, in connect
await self._do_sasl_handshake()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 316, in _do_sasl_handshake
payload, expect_response
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
return fut.result()
kafka.errors.KafkaConnectionError: KafkaConnectionError: Connection at test.com:9094 closed
{"access_token":"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJtNDV5a1RZRXhXZzdfZmUybHdBcVRDSGV2bnQtMWMwNjRiTTl2UlN3NVY0In0.eyJleHAiOjE2MDcxMjQzNTYsImlhdCI6MTYwNjUxOTU1NiwianRpIjoiN2M3OGU0ZmItNmQ0ZC00YTAyLWI1YzYtZGVmOTA1OGY0MmY0IiwiaXNzIjoiaHR0cHM6Ly9jbG91ZGRhdGFzZXJ2aWNlLmF1dGgubmFzZGFxLmNvbS9hdXRoL3JlYWxtcy9wcm8tcmVhbG0iLCJzdWIiOiI0YzIzOTNhOC0yM2FmLTQ1MDc
0NDU5ZDJhM2E5IiwiaXNzIjoiaHR0cHM6Ly9jbG91ZGRhdGFzZXJ2aWNlLmF1dGgubmFzZGFxLmNvbS9hdXRoL3JlYWxtcy9wcm8tcmVhbG0iLCJhdWQiOiJodHRwczovL2Nsb3VkZGF0YXNlcnZpY2UuYXV0aC5uYXNkYXEuY29tL2F1dGgvcmVhbG1zL3Byby1yZWFsbSIsInN1YiI6IjRjMjM5M2E4LTIzYWYtNDUwNy1iYmI2LTFlZTkwODhmZDBiMiIsInR5cCI6IlJlZnJlc2giLCJhenAiOiJzeW5lcmdpc2NhcGl0YWwtZGF2aWQtc2Nob29sZXkiLCJzZXNzaW9uX3N0YXRlIjoiZmQ2NDkyZTMtMzRmMi00ZjEzLWI5MDgtM2M1NmU1OWQwYWYzIiwic2NvcGUiOiJlbWFpbCBwcm9maWxlIn0.5zZw0OZCuDUKzjWVZ-23s-DagXhOtzsvefJ9q8Q7vXA","token_type":"bearer","not-before-policy":0,"session_state":"fd6492e3-34f2-4f13-b908-3c56e59d0af3","scope":"email profile"}
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f9c183e7c88>
Below is my code:
def create_ssl_context():
_ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
_ssl_context.options |= ssl.OP_NO_SSLv2
_ssl_context.options |= ssl.OP_NO_SSLv3
_ssl_context.verify_mode = ssl.CERT_NONE
_ssl_context.check_hostname = False
_ssl_context.load_default_certs()
return _ssl_context
class CustomTokenProvider(AbstractTokenProvider):
async def token(self):
return asyncio.get_running_loop().run_in_executor(
None, self._token)
def _token(self):
token_url = 'https://test.com/auth/realms/pro-realm/protocol/openid-connect/token'
client = BackendApplicationClient(client_id='adfafd')
oauth = OAuth2Session(client=client)
token_json = oauth.fetch_token(token_url=token_url, client_id='adfasdf', client_secret='adfadf')
token = token_json['access_token']
return token
loop = asyncio.get_event_loop()
async def consume():
consumer = AIOKafkaConsumer(
"test-4.stream",
loop=loop,
sasl_oauth_token_provider=CustomTokenProvider(),
security_protocol="SASL_SSL",
sasl_mechanism="OAUTHBEARER",
enable_auto_commit=False,
ssl_context=create_ssl_context(),
bootstrap_servers='test.com:9094')
# Get cluster layout and topic/partition allocation
await consumer.start()
try:
async for msg in consumer:
print(msg.value)
finally:
await consumer.stop()
loop.run_until_complete(consume())
Any updates on this? Having the same issue
@davidmontgom & @krlx - I had a similar problem and resolved it by awaiting the future in CustomTokenProvider. Try this change
async def token(self):
return await asyncio.get_running_loop().run_in_executor(
None, self._token)