aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

[QUESTION] Restarting `AIOKafkaConsumer` after `AIOKafkaConsumer.stop()`

Open dom-gunstone opened this issue 1 month ago • 3 comments

Hi,

I have a use case where I'm injecting a AIOKafkaConsumer instance into a class. The class has its own start and stop methods that, amongst other things, call the start and stop methods of the consumer class. The issue is that I'm failing the assert self._fetcher is None check in AIOKafkaConsumer.start() method.

Am I right in thinking this should work?

import asyncio

from aiokafka import AIOKafkaConsumer


async def main() -> None:
    consumer = AIOKafkaConsumer("test_topic", bootstrap_servers="localhost:9092")

    await consumer.start()
    await consumer.stop()
    await consumer.start()


if __name__ == "__main__":
    asyncio.run(main())

I suppose I could pass the class a function that creates the consumer instance, but it feels a bit less neat.

import asyncio

from aiokafka import AIOKafkaConsumer


async def main() -> None:
    def get_new_consumer() -> AIOKafkaConsumer:
        return AIOKafkaConsumer("test_topic", bootstrap_servers="localhost:9092")

    consumer = get_new_consumer()

    await consumer.start()
    await consumer.stop()

    consumer = get_new_consumer()

    await consumer.start()


if __name__ == "__main__":
    asyncio.run(main())

dom-gunstone avatar May 22 '24 10:05 dom-gunstone