aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

[QUESTION] Any noticeable bugs in this code

Open ekeric13 opened this issue 2 years ago • 1 comments

I am wondering if there are any noticeable bugs in this code? I am running into some issues with my code and wondering if anything jumps out.

  1. every time i start my code i get the warning: "Heartbeat failed for group $consumer_group_name because it is rebalancing"
  2. Takes about 10 to 15 seconds to be able to start consuming
  3. it does take a while to start consuming relative to other kafka clients i have used (though those clients were in golang so maybe the issue is just with python and not with the client lib).
  4. Not sure what the best way to stop the consumers (one consumer for each topic in its own un-awaited task... so they are just running constantly in the background)
  5. Not sure if running the consumers in the background within an un-awaited task is even a good pattern.
  6. I think it might be the case that those consumers never exit and i have a memory leak... more on this below...
  7. Got error "OffsetCommit failed for group my_consumer_group due to group error ([Error 22] IllegalGenerationError: my_consumer_group), will rejoin" and then the kafka consumer just stopped consuming

Anyway here is my kafka consumer:

HandlerFunc = Callable[[dict[str, Any]], Awaitable[None]]

class KafkaConsumer:
    consumers: list[tuple[AIOKafkaConsumer, HandlerFunc]]

    def __init__(self) -> None:
        self.consumers = []
        self.loop = asyncio.get_running_loop()
        for topic, handler in topic_and_handler:
            consumer = AIOKafkaConsumer(
                topic,
                bootstrap_servers=settings.queue.BOOTSTRAP_SERVERS,
                group_id=settings.queue.CONSUMER_GROUP,
                auto_offset_reset="earliest",
                enable_auto_commit=False,
                sasl_mechanism="SCRAM-SHA-256",
                security_protocol="SASL_SSL",
                sasl_plain_username=settings.queue.USERNAME,
                sasl_plain_password=settings.queue.PASSWORD,
                ssl_context=ssl_context,
                loop=self.loop,
            )
            self.consumers.append((consumer, handler))

        self.stop_consuming = False
        self.consuming_task = None

    async def on_app_start(self) -> None:
        self.consuming_task = self.loop.create_task(self.start_worker())

    async def start_worker(self) -> None:
        tasks = []
        for consumer, handler in self.consumers:
            await consumer.start()
            task = self.loop.create_task(self.consume(consumer, handler))
            tasks.append(task)
        logger.info("starting to consume from kafka")
        await asyncio.gather(*tasks)

    async def on_app_stop(self) -> None:
        self.stop_consuming = True
		# in theory the above breaks self.consume out of the for loop but when running the code
        # i notice that if i "await self.consuming_task" i never exit the program. Not sure if self.consume is ever garbage
        # collected... ideally i want something similar to golang's ctx.Done() and pass ctx into consumer.getone()
        # but i don't think that is a pattern in python... current solution is to cancel parent task
		if self.consuming_task is not None:
            self.consuming_task.cancel()
        for consumer, _ in self.consumers:
            await consumer.stop()

    async def consume(self, consumer: AIOKafkaConsumer, handler: HandlerFunc) -> None:
        while not self.stop_consuming:
            msg = await consumer.getone()
            if msg is None:
                continue

            headers = {k: v.decode("utf-8") for k, v in msg.headers}
            namespace = headers.get("namespace")
            logger.info(namespace)

            # if not in namespace then commit message and move on without handling it
            if namespace != settings.queue.NAMESPACE:
                await consumer.commit()
                continue

            msg_value = msg.value.decode("utf-8")
            data = json.loads(msg_value)
            await handler(data)
            await consumer.commit()
            await asyncio.sleep(1)

here is my kafka producer:

class KafkaProducer:
    def __init__(self) -> None:
        self.loop = asyncio.get_running_loop()
        self.producer = AIOKafkaProducer(
            bootstrap_servers=settings.queue.BOOTSTRAP_SERVERS,
            acks="all",
            sasl_mechanism="SCRAM-SHA-256",
            security_protocol="SASL_SSL",
            sasl_plain_username=settings.queue.USERNAME,
            sasl_plain_password=settings.queue.PASSWORD,
            ssl_context=ssl_context,
            loop=self.loop,
        )

    async def on_app_start(self):
        await self.producer.start()

    async def on_app_stop(self) -> None:
        await self.producer.stop()

    async def produce(
        self, topic: str, value: dict[str, Any], optional_headers: Optional[dict[str, str]] = None
    ) -> None:
        if optional_headers is None:
            optional_headers = {}
        headers = {
            "namespace": settings.queue.NAMESPACE,
            **optional_headers,
        }
        value_bytes = json.dumps(value).encode("utf-8")
        headers_list = [(k, v.encode("utf-8")) for k, v in headers.items()]
        await self.producer.send_and_wait(topic, value=value_bytes, headers=headers_list)
        return

And then on boot I do:

kafka_producer = KafkaProducer()
kafka_consumer = KafkaConsumer()
...
await kafka_producer.on_app_start()
await kafka_consumer.on_app_start()

and then on app shutdown i do:

await kafka_consumer.on_app_stop()
await kafka_producer.on_app_stop()

I might bring the kafka consumer stuff to a completely different thread or process with the multiprocessing or threading modules but for now i am running them in the same process as my server and i am just multiplexing IO via asyncio. My understanding is that python is single threaded so if i create a new thread it would be a green thread anyway and would essentially be the same as multiplexing IO.

ekeric13 avatar Mar 16 '23 10:03 ekeric13

I has meet similar situation with you and after digging into some code here is my suggestion hoping helpful to you:

Some problems are relative: all of 1 2 3 are relative to the rebalance process, which takes longer than your expect. 1 is ok, because once you start the consumer the heartbeat routine has been created and started. So if rebalance process takes too long it will complain about meaningless heartbeat but it is ok and can be ignored. But the real problem is why rebalance process takes so long, it should check the setting between consumers.

  1. break the loop that reading messages then stop the consumer is ok.
  2. that could be a good pattern to be a background task, I can not tell any difference between choices because asyncio cliams that all task is equality in the event loop.
  3. the code shown above could be some problem. In start_worker(), a coroutine once it has been passed into self.loop.create_task(), it is created and will run immediately. self.loop.create_task() takes a Coroutines return a Task (see here) so it is not expected to call asyncio.gather() since it take Coroutines but not task. It is ok to just let it go.

To stop your reading loop, just check if self.stop_consuming == True and break in loop then await each task. Task.done() is similar to golang's ctx.Done() but you could expect the while will break so there is no need to double check the Task.done()

    async def start_worker(self) -> None:
        tasks = []
        for consumer, handler in self.consumers:
            await consumer.start()
            task = self.loop.create_task(self.consume(consumer, handler))
            tasks.append(task)
        logger.info("starting to consume from kafka")
        self.tasks = tasks 

    async def on_app_stop(self) -> None:
        self.stop_consuming = True
        for task in self.tasks:
            await task
        for consumer, _ in self.consumers:
            await consumer.stop()
  1. it is also relative to rebalance because if you try to manage offset, see the warning here

xlichao avatar Nov 05 '23 09:11 xlichao