aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Stop consumer raises CancelledError

Open easysugar opened this issue 3 years ago • 2 comments

Hello. I am using kafka consumer only for getting partitions for topic. And after that, I try to close consumer, but got CancelledError. It seems, that it might be some time between start() and stop(), because adding await asyncio.sleep(1) doesn't raise an error.

Expected behaviour Consumer stops and doesn't raise an error

Environment:

  • aiokafka version: 0.5.2
  • kafka-python version: 1.4.6
  • Kafka Broker version: 5.3.1

Reproducible example

import aiokafka
import asyncio
consumer = aiokafka.AIOKafkaConsumer(bootstrap_servers=bootstrap_servers, loop=asyncio.get_event_loop())
await consumer.start()
my_partitions = consumer.partitions_for_topic(my_topic) 
await consumer.stop()
/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py in stop(self)
    472         self._closed = True
    473         if self._coordinator:
--> 474             yield from self._coordinator.close()
    475         if self._fetcher:
    476             yield from self._fetcher.close()

/usr/local/lib/python3.7/site-packages/aiokafka/consumer/group_coordinator.py in close(self)
    160     def close(self):
    161         self._reset_committed_task.cancel()
--> 162         yield from self._reset_committed_task
    163         self._reset_committed_task = None
    164 

CancelledError:

easysugar avatar Jul 30 '20 10:07 easysugar

This happens due to the fact that we're trying to run a closed asyncio.Task, which is not allowed.

This is actually correct asyncio functionality: asyncio.Task

I changed the method in aiokafka to async def close(self): while not self._reset_committed_task.cancel(): await asyncio.sleep(0.1) self._reset_committed_task = None

and it works fine. However, this can get into a loop forever if your asyncio loop is unable to close the task for whatever reason. Another option is to raise an exception if the task.cancel() doesn't return true, which might be more safe?

Skyel14 avatar Aug 11 '21 03:08 Skyel14

No, this happens because self._coordinator.close() cancels a task and then immediately awaits the task, thus raising a CancelledError. This means that the rest of the consumer's finalization process is not run.

agronholm avatar Apr 07 '24 09:04 agronholm