aiokafka
aiokafka copied to clipboard
[QUESTION] Restarting `AIOKafkaConsumer` after `AIOKafkaConsumer.stop()`
Hi,
I have a use case where I'm injecting a AIOKafkaConsumer
instance into a class. The class has its own start
and stop
methods that, amongst other things, call the start
and stop
methods of the consumer class. The issue is that I'm failing the assert self._fetcher is None
check in AIOKafkaConsumer.start()
method.
Am I right in thinking this should work?
import asyncio
from aiokafka import AIOKafkaConsumer
async def main() -> None:
consumer = AIOKafkaConsumer("test_topic", bootstrap_servers="localhost:9092")
await consumer.start()
await consumer.stop()
await consumer.start()
if __name__ == "__main__":
asyncio.run(main())
I suppose I could pass the class a function that creates the consumer instance, but it feels a bit less neat.
import asyncio
from aiokafka import AIOKafkaConsumer
async def main() -> None:
def get_new_consumer() -> AIOKafkaConsumer:
return AIOKafkaConsumer("test_topic", bootstrap_servers="localhost:9092")
consumer = get_new_consumer()
await consumer.start()
await consumer.stop()
consumer = get_new_consumer()
await consumer.start()
if __name__ == "__main__":
asyncio.run(main())