aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

KafkaConnectionError for Azure Event Hub during connect phase

Open TribuneX opened this issue 2 years ago • 5 comments

We are using a Azure Event Hub as our Kafka compatible broker. We noticed, that occasionally the connection process does not complete successfully, but throws the following error:

DEBUG:aiokafka:Attempting to bootstrap via node at xxx..servicebus.windows.net:9093
ERROR:aiokafka:Unable connect to "xxx.servicebus.windows.net:9093": 
[...]
KafkaConnectionError: Unable to bootstrap from [('xxx.servicebus.windows.net', 9093, <AddressFamily.AF_UNSPEC: 0>)]

The error happens here.

This error is reproducible when starting and stopping a connection via aiokafka in a loop. At the attempt ~14 the connection fails with the above error. We analyzed the traffic during these attempts and saw, that the Azure Event Hub sends a TCP reset packet for the unsuccessful connection attempt. Therefore, this is not necessarily an issue with aiokafka, but could be related to some limits applied by Azure Event Hub.

Since we expect many people are using this library together with Azure Event Hub, we are wondering if other people experience the same issue? We do not understand yet if the their is a rate limiting in place with Azure or any other reason the hub does not allow this connection attempt anymore after the previous ones have been successful.

Currently, the only workaround we have is implementing a retry mechanism for this specific error, since another connection attempt after this error usually works.

Environment

  • aiokafka version: 0.8.0
  • kafka-python version: 2.0.2
  • Azure Event Hub broker

TribuneX avatar Jun 06 '23 05:06 TribuneX

I'm getting the same issue and I can't for the life of me fixture out what's happening here. Did you end up getting to the bottom of it?

sf-chris avatar Feb 29 '24 19:02 sf-chris

Okay so in my specific use case I ended up finding the issue.

Firstly here's my configuration:

from aiokafka import AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context

ssl_context = create_ssl_context() if "SSL" in kafka_settings.SECURITY_PROTOCOL else None
consumer = AIOKafkaConsumer(
    "events",
    bootstrap_servers=kafka_settings.BOOTSTRAP_SERVERS,
    group_id=kafka_settings.GROUP_ID,
    auto_offset_reset=kafka_settings.AUTO_OFFSET_RESET,
    security_protocol=kafka_settings.SECURITY_PROTOCOL,
    ssl_context=ssl_context,
    sasl_mechanism=kafka_settings.SASL_MECHANISM,
    sasl_plain_password=kafka_settings.SASL_PLAIN_PASSWORD,
    sasl_plain_username=kafka_settings.SASL_PLAIN_USERNAME,
)

And the corresponding environment variables

KAFKA_BOOTSTRAP_SERVERS="your-ns.servicebus.windows.net:9093"
KAFKA_GROUP_ID=group-id
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_PLAIN_PASSWORD="Endpoint=sb://your-ns-kafka.servicebus.windows.net/;SharedAccessKeyName=default;SharedAccessKey=123412341234;EntityPath=events"
KAFKA_SASL_PLAIN_USERNAME="$$ConnectionString"

Now the issue: I can't find ANYWHERE in the azure docs that this setting is critically important, but you need to set the Local Authentication to enabled

image

Hope this helps someone!

sf-chris avatar Feb 29 '24 20:02 sf-chris