aiokafka
aiokafka copied to clipboard
Connection should be close if there is a failure during start
Describe the bug
After calling await consumer.start(), AioKafkaConnection.__del__ calls self._loop.call_exception_handler after failing to connect to a Kafka node. If there is a failure during startup, but after the client is "connected", aiokafka should close or cleanup the connection properly without warning about an unclosed connection and calling the loop exception handler. For example, if there was a failure during version_lookup I believe AIOKafkaConnection would be "connected", but the user code does not have a chance to close the connection.
Unable connect to node with id 2:
Unclosed AIOKafkaConnection
conn: <AIOKafkaConnection host=kubeprod.aws.aurotfp.com port=31092>
Exception ignored in: <function AIOKafkaConnection.__del__ at 0x7fa2da444700>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/aiokafka/conn.py", line 202, in __del__
self._loop.call_exception_handler(context)
File "/usr/lib/python3.8/asyncio/base_events.py", line 1744, in call_exception_handler
self._exception_handler(self, context)
File "/usr/local/lib/python3.8/dist-packages/featuremine/system/select_events.py", line 162, in exception_handler
raise SystemExit("Exception occured.")
SystemExit: Exception occured.
Environment (please complete the following information): aiokafka 0.7.0 kafka-python 2.0.2
We've got exactly this situation with AWS MSK. This happens when aiokafka is able to connect to a broker, but then some timeout occurs in AioKafkaConnection.connect(), which triggers Unclosed AIOKafkaConnection from __del__. I've prepared a patch for this.
Though, I'm curious, why AIOKafkaConnection.__del__() is calling loop exception handler manually (self._loop.call_exception_handler(context))? I think loop exception handler should only get real exceptions which weren't handler anywhere. Also, it passes context['source_traceback'], - this source_traceback is not a standard field from https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_exception_handler. Can somebody from the maintainers explain this? @tvoinarovskyi
hitting this on aiokafka-7.1.0, has anyone found a workaround?
@hellocoldworld Could you provide more info? The original problem stated here should have been fixed in https://github.com/aio-libs/aiokafka/pull/810
@ods I've checked the code in #810, the fix is a bit different from what I've did in https://github.com/bitphage/aiokafka/commit/b1904e22c17f5593ad98649250d2a183dab29a8c
Specifically, in #810 in aiokafka/conn.py
async def connect(self):
# skipped
async with async_timeout.timeout(self._request_timeout):
This block still could raise an exception (timeout error) which will not be handled.