aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Can I use Azure Event Hubs with `aiokafka`?

Open junyeong-huray opened this issue 1 year ago • 1 comments

Hello

First of all, thanks for this nice project!

I am using aiokafka and it has been doing great job for a couple of months in my project.

Recently I am preparing to use Azure Event Hubs. I considered aiokakfa can work with Azure Event Hubs out of the box, because Azure Event Hubs says it is compatible with Kafka. It mentions "Since it supports Apache Kafka, you bring Kafka workloads to Azure Event Hubs without doing any code change." Wow.

But the thing is... when I am trying producing a message to the Azure Event Hubs using AIOKafkaProducer, it fails finding coordinator and raises KafkaError.

Code

Here is minimal reproducible example of the problem.

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)


async def test_kafka():
    import aiokafka.helpers
    import certifi
    from aiokafka import AIOKafkaProducer

    ssl_context = aiokafka.helpers.create_ssl_context(cafile=certifi.where())
    producer = AIOKafkaProducer(
        bootstrap_servers='my-kafka-ns.servicebus.windows.net:9093',
        enable_idempotence=True,
        transactional_id='some-id',
        transaction_timeout_ms=20000,
        security_protocol='SASL_SSL',
        sasl_plain_username='$ConnectionString',
        sasl_plain_password=
        'Endpoint=sb://my-kafka-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=my-shared-access-key',
        sasl_mechanism='PLAIN',
        ssl_context=ssl_context)
    await producer.start()
    async with producer.transaction():
        await producer.send_and_wait('aztest',
                                     b'aztest-data-1234',
                                     key=b'message-key')


asyncio.run(test_kafka())

Log

And here is the debug console log.

root@610947149cd0:/head# poetry run python head/aztest.py 
DEBUG:asyncio:Using selector: EpollSelector
INFO:aiokafka.helpers:Loading SSL CA from /root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/certifi/cacert.pem
DEBUG:aiokafka.producer.producer:Starting the Kafka producer
DEBUG:aiokafka:Attempting to bootstrap via node at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: SaslHandShakeRequest_v0(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: MetadataResponse_v0(brokers=[(node_id=0, host='my-kafka-ns.servicebus.windows.net', port=9093)], topics=[(error_code=0, topic='aztest', partitions=[(error_code=0, partition=0, leader=0, replicas=[], isr=[])])])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka.conn:Closing connection at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka:Received cluster metadata: ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka:Initiating connection to node 0 at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: SaslHandShakeRequest_v0(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=3, max_version=7), (api_key=1, min_version=4, max_version=6), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=5), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=7), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=6), (api_key=20, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=0), (api_key=32, min_version=0, max_version=2), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0)])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 3: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 3: MetadataResponse_v0(brokers=[(node_id=0, host='my-kafka-ns.servicebus.windows.net', port=9093)], topics=[(error_code=0, topic='aztest', partitions=[(error_code=0, partition=0, leader=0, replicas=[], isr=[])])])
DEBUG:aiokafka.conn:Closing connection at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka:Sending FindCoordinator request for key 309a1400-c988-4446-b8ff-73d9cc1abf6d to broker 0
DEBUG:aiokafka:Initiating connection to node 0 at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=3, max_version=7), (api_key=1, min_version=4, max_version=6), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=5), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=7), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=6), (api_key=20, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=0), (api_key=32, min_version=0, max_version=2), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0)])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: SaslHandShakeRequest_v1(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: SaslHandShakeResponse_v1(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 3: SaslAuthenticateRequest_v1(sasl_auth_bytes=b'$ConnectionString\x00$ConnectionString\x00Endpoint=sb://my-kafka-ns.servicebus.windows.net/;SharedAccessKeyNam...')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 3: SaslAuthenticateResponse_v1(error_code=0, error_message=None, sasl_auth_bytes=b'', session_lifetime_ms=0)
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 4: FindCoordinatorRequest_v1(coordinator_key='309a1400-c988-4446-b8ff-73d9cc1abf6d', coordinator_type=1)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 4: FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=42, error_message='FindCoordinator asked for coordinator with type code 1 which is not supported.', coordinator_id=0, host='', port=0)
DEBUG:aiokafka:Received group coordinator response FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=42, error_message='FindCoordinator asked for coordinator with type code 1 which is not supported.', coordinator_id=0, host='', port=0)
ERROR:aiokafka.producer.sender:FindCoordinator Request failed: [Error 42] InvalidRequestError
Traceback (most recent call last):
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 198, in _find_coordinator
    coordinator_id = await self.client.coordinator_lookup(
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/client.py", line 682, in coordinator_lookup
    raise err
kafka.errors.InvalidRequestError: [Error 42] InvalidRequestError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/head/head/aztest.py", line 33, in <module>
    asyncio.run(test_kafka())
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/head/head/aztest.py", line 26, in test_kafka
    await producer.start()
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/producer.py", line 321, in start
    await self._sender.start()
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 55, in start
    await self._maybe_wait_for_pid()
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 178, in _maybe_wait_for_pid
    node_id = await self._find_coordinator(
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 213, in _find_coordinator
    raise KafkaError(repr(err))
kafka.errors.KafkaError: KafkaError: InvalidRequestError()
ERROR:asyncio:Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0xffffb6acb190>
root@610947149cd0:/head# 

Can anyone give me some idea to solve this problem?

Thanks,

junyeong-huray avatar Nov 23 '23 05:11 junyeong-huray

I found it's okay if I don't pass transactional_id to the AIOKafkaProducer. I guess the error incurs when I am using a kafka transaction feature because Azure Event Hubs does not provide that feature.

Here is the document of Azure Event Hubs. This document says it lacks of some Kafka features like transaction, compression and Kafka streams.

https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview#feature-differences-with-apache-kafka

junyeong-huray avatar Nov 23 '23 07:11 junyeong-huray