aiokafka
aiokafka copied to clipboard
Stop consumer raises CancelledError
Hello. I am using kafka consumer only for getting partitions for topic. And after that, I try to close consumer, but got CancelledError. It seems, that it might be some time between start()
and stop()
, because adding await asyncio.sleep(1)
doesn't raise an error.
Expected behaviour Consumer stops and doesn't raise an error
Environment:
- aiokafka version: 0.5.2
- kafka-python version: 1.4.6
- Kafka Broker version: 5.3.1
Reproducible example
import aiokafka
import asyncio
consumer = aiokafka.AIOKafkaConsumer(bootstrap_servers=bootstrap_servers, loop=asyncio.get_event_loop())
await consumer.start()
my_partitions = consumer.partitions_for_topic(my_topic)
await consumer.stop()
/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py in stop(self)
472 self._closed = True
473 if self._coordinator:
--> 474 yield from self._coordinator.close()
475 if self._fetcher:
476 yield from self._fetcher.close()
/usr/local/lib/python3.7/site-packages/aiokafka/consumer/group_coordinator.py in close(self)
160 def close(self):
161 self._reset_committed_task.cancel()
--> 162 yield from self._reset_committed_task
163 self._reset_committed_task = None
164
CancelledError:
This happens due to the fact that we're trying to run a closed asyncio.Task, which is not allowed.
This is actually correct asyncio functionality: asyncio.Task
I changed the method in aiokafka to
async def close(self): while not self._reset_committed_task.cancel(): await asyncio.sleep(0.1) self._reset_committed_task = None
and it works fine. However, this can get into a loop forever if your asyncio loop is unable to close the task for whatever reason. Another option is to raise an exception if the task.cancel() doesn't return true, which might be more safe?
No, this happens because self._coordinator.close()
cancels a task and then immediately awaits the task, thus raising a CancelledError
. This means that the rest of the consumer's finalization process is not run.