aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Stopping multiple consumers in a short window freezes on GroupCoordinator._heartbeat_routine()

Open RyanSept opened this issue 4 years ago • 6 comments

Describe the bug I have several hundred consumers all connected to the same singular topic with autocommit disabled. Each consumer is part of a unique individual group. When I call AIOKafkaConsumer.stop() on each of the consumers within a for loop the task doing the stopping eventually halts completely. When I probe into the task I find that it is stuck waiting on GroupCoordinator._heartbeat_routine() and it remains this way indefinitely.

 <Task pending name='Task-22' coro=<consumer_stopper() running at /bar/foo.py: 692> wait_for=<Task pending name='Task-54392' coro=<GroupCoordinator._heartbeat_routine() running at /usr/local/lib/python3.8/dist-packages/aiokafka/consumer/group_coordinator.py: 724> 
 
 # It also seems to happen with  GroupCoordinator._commit_refresh_routine(), GroupCoordinator._commit_refresh_task and AIOKafkaClient._sync_task,

Expected behaviour GroupCoordinator._heartbeat_routine() should not prevent AIOKafkaConsumer.stop() from completing.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.7.1
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"): 2.0.2
  • Kafka Broker version (kafka-topics.sh --version): 2.6.0
  • Other information (Confluent Cloud version, etc.): Python 3.8.10

Reproducible example

RyanSept avatar Aug 08 '21 21:08 RyanSept

I think it may be happening when stop() is called after a rebalance and that consumer might not be a part of the group anymore but I'm not sure. I'm trying to write a script that will recreate it reliably

RyanSept avatar Aug 11 '21 18:08 RyanSept

This code reproduces the issue. I've been able to make it happen on Python 3.8.10 and 3.9.6 but it does not occur for Python 3.8.5. This was running on a Ubuntu 20.04 Docker container.

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

import json
import random

EVENTS_PER_SECOND = 25


async def producer_task():
    producer = AIOKafkaProducer(
        loop=asyncio.get_running_loop(),
        bootstrap_servers="<bootstrap-servers>",
    )
    data = json.dumps(
        {
            "foobar": "test",
            "barfoo": "testing",
            "raboof": "1388383131",
            "fraboo": "paaonsasj",
            "39u13913u31": "931uf0ih91f3",
            "9fh-131f3f13f": {"timestamp": "2021-08-22T22:58:18.654101Z"},
        },
        indent=2,
    ).encode("utf-8")

    await producer.start()
    count = 0
    # The more events produced, the more likely we'll hang
    while True:
        await producer.send_and_wait("<topic>", data)
        await asyncio.sleep(1.0 / EVENTS_PER_SECOND)
        count += 1


def display_task_statuses(ltasks):
    d = {"pending": 0, "stopped": 0}
    for _, task in ltasks:
        if not task.done():
            d["pending"] += 1
        else:
            d["stopped"] += 1
    print("CONSUMER TASKS STATUSES", d)


async def consumer_task(consumer: AIOKafkaConsumer, all_consumers: list):
    count = 0
    while True:
        try:
            msg = await consumer.getone()
            # print(f"{consumer._group_id} Got message #{msg.offset}")
            await consumer.commit()
            display_task_statuses(all_consumers)
            count += 1
        except asyncio.CancelledError:
            raise


async def start_and_stop_consumers():
    consumers = []
    loop = asyncio.get_running_loop()
    loop.create_task(producer_task())
    # allow for some events to be produced
    await asyncio.sleep(5.0)

    # the more the consumers the more likely consumer.stop() will hang
    for i in range(200):
        consumer = AIOKafkaConsumer(
            loop=loop,
            bootstrap_servers="<bootstrap-servers>",
            group_id=f"our-consumer-group-{i}",
            enable_auto_commit=False,
            max_poll_interval_ms=2590000,
        )
        consumer.subscribe(["<topic>"])
        await consumer.start()
        await consumer.seek_to_end()
        consumers.append(
            (
                consumer,
                loop.create_task(
                    consumer_task(consumer, consumers), name=f"our-consumer-{i}-task"
                ),
            )
        )
        print(f"Started consumer {i}")

    for consumer, task in consumers:
        # We expect to stop() hang for a random consumer after a few minutes and hang 
        # indefinitely on a random task.
        # Keep watch of pending vs stopped consumer tasks being printed, `stopped` will stop increasing.
        await consumer.stop()
        task.cancel()
        print(f"Stopped consumer {consumer._group_id}")
        # if everything works as in CPython 3.8.5 we should see the last printed 
        # line read "Stopped consumer our-consumer-group-199"

if __name__ == "__main__":
    asyncio.run(start_and_stop_consumers())

RyanSept avatar Aug 22 '21 23:08 RyanSept

I've been able to make it happen on Python 3.8.10 and 3.9.6 but it does not occur for Python 3.8.5.

This behaviour suggests that it could be related to the bug in wait_for introduced in Python 3.8.6.

ods avatar Aug 23 '21 10:08 ods

Wow. This sounds consistent with the behaviour I've been seeing around other wait_for calls I tried to make. Besides downgrading the Python version, is there any other patch you'd recommend in the meanwhile?

RyanSept avatar Aug 23 '21 12:08 RyanSept

It's possible to monkey-patch wait_for with the implementation prior to this bug, which had another bug. But that means you just have to choose from these two bugs which one beats you less. The problem is that for the current architecture of asyncio there is no visible way to fix this without breaking something else. And there is even no enough interest from the community to start discussion on what we can do. By leaving a comment there you might raise a change to put it on the agenda a bit. Until then I'd rather suggest to not rely on tasks in Python to be cancellable at all. Doesn't stop in reasonable time? Just kill it.

ods avatar Aug 23 '21 13:08 ods

Thanks. The biggest blocker is actually the AIOKafkaConsumer() stop call and having unreleased resources left over. I've mentioned that here https://github.com/python/cpython/pull/26097#issuecomment-903792466. Thank you for your suggestion around monkey-patching!

RyanSept avatar Aug 23 '21 13:08 RyanSept