aiokafka
aiokafka copied to clipboard
Can I use Azure Event Hubs with `aiokafka`?
Hello
First of all, thanks for this nice project!
I am using aiokafka
and it has been doing great job for a couple of months in
my project.
Recently I am preparing to use Azure Event Hubs. I considered aiokakfa
can
work with Azure Event Hubs out of the box, because Azure Event Hubs says it is
compatible with Kafka. It mentions "Since it supports Apache Kafka, you bring
Kafka workloads to Azure Event Hubs without doing any code change." Wow.
But the thing is... when I am trying producing a message to the Azure Event Hubs
using AIOKafkaProducer
, it fails finding coordinator and raises KafkaError
.
Code
Here is minimal reproducible example of the problem.
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
async def test_kafka():
import aiokafka.helpers
import certifi
from aiokafka import AIOKafkaProducer
ssl_context = aiokafka.helpers.create_ssl_context(cafile=certifi.where())
producer = AIOKafkaProducer(
bootstrap_servers='my-kafka-ns.servicebus.windows.net:9093',
enable_idempotence=True,
transactional_id='some-id',
transaction_timeout_ms=20000,
security_protocol='SASL_SSL',
sasl_plain_username='$ConnectionString',
sasl_plain_password=
'Endpoint=sb://my-kafka-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=my-shared-access-key',
sasl_mechanism='PLAIN',
ssl_context=ssl_context)
await producer.start()
async with producer.transaction():
await producer.send_and_wait('aztest',
b'aztest-data-1234',
key=b'message-key')
asyncio.run(test_kafka())
Log
And here is the debug console log.
root@610947149cd0:/head# poetry run python head/aztest.py
DEBUG:asyncio:Using selector: EpollSelector
INFO:aiokafka.helpers:Loading SSL CA from /root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/certifi/cacert.pem
DEBUG:aiokafka.producer.producer:Starting the Kafka producer
DEBUG:aiokafka:Attempting to bootstrap via node at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: SaslHandShakeRequest_v0(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: MetadataResponse_v0(brokers=[(node_id=0, host='my-kafka-ns.servicebus.windows.net', port=9093)], topics=[(error_code=0, topic='aztest', partitions=[(error_code=0, partition=0, leader=0, replicas=[], isr=[])])])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka.conn:Closing connection at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka:Received cluster metadata: ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka:Initiating connection to node 0 at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: SaslHandShakeRequest_v0(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=3, max_version=7), (api_key=1, min_version=4, max_version=6), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=5), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=7), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=6), (api_key=20, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=0), (api_key=32, min_version=0, max_version=2), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0)])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 3: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 3: MetadataResponse_v0(brokers=[(node_id=0, host='my-kafka-ns.servicebus.windows.net', port=9093)], topics=[(error_code=0, topic='aztest', partitions=[(error_code=0, partition=0, leader=0, replicas=[], isr=[])])])
DEBUG:aiokafka.conn:Closing connection at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka:Sending FindCoordinator request for key 309a1400-c988-4446-b8ff-73d9cc1abf6d to broker 0
DEBUG:aiokafka:Initiating connection to node 0 at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=3, max_version=7), (api_key=1, min_version=4, max_version=6), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=5), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=7), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=6), (api_key=20, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=0), (api_key=32, min_version=0, max_version=2), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0)])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: SaslHandShakeRequest_v1(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: SaslHandShakeResponse_v1(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 3: SaslAuthenticateRequest_v1(sasl_auth_bytes=b'$ConnectionString\x00$ConnectionString\x00Endpoint=sb://my-kafka-ns.servicebus.windows.net/;SharedAccessKeyNam...')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 3: SaslAuthenticateResponse_v1(error_code=0, error_message=None, sasl_auth_bytes=b'', session_lifetime_ms=0)
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 4: FindCoordinatorRequest_v1(coordinator_key='309a1400-c988-4446-b8ff-73d9cc1abf6d', coordinator_type=1)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 4: FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=42, error_message='FindCoordinator asked for coordinator with type code 1 which is not supported.', coordinator_id=0, host='', port=0)
DEBUG:aiokafka:Received group coordinator response FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=42, error_message='FindCoordinator asked for coordinator with type code 1 which is not supported.', coordinator_id=0, host='', port=0)
ERROR:aiokafka.producer.sender:FindCoordinator Request failed: [Error 42] InvalidRequestError
Traceback (most recent call last):
File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 198, in _find_coordinator
coordinator_id = await self.client.coordinator_lookup(
File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/client.py", line 682, in coordinator_lookup
raise err
kafka.errors.InvalidRequestError: [Error 42] InvalidRequestError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/head/head/aztest.py", line 33, in <module>
asyncio.run(test_kafka())
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/head/head/aztest.py", line 26, in test_kafka
await producer.start()
File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/producer.py", line 321, in start
await self._sender.start()
File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 55, in start
await self._maybe_wait_for_pid()
File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 178, in _maybe_wait_for_pid
node_id = await self._find_coordinator(
File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 213, in _find_coordinator
raise KafkaError(repr(err))
kafka.errors.KafkaError: KafkaError: InvalidRequestError()
ERROR:asyncio:Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0xffffb6acb190>
root@610947149cd0:/head#
Can anyone give me some idea to solve this problem?
Thanks,
I found it's okay if I don't pass transactional_id
to the AIOKafkaProducer
.
I guess the error incurs when I am using a kafka transaction feature because
Azure Event Hubs does not provide that feature.
Here is the document of Azure Event Hubs. This document says it lacks of some Kafka features like transaction, compression and Kafka streams.
https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview#feature-differences-with-apache-kafka