aiokafka
aiokafka copied to clipboard
Stopping multiple consumers in a short window freezes on GroupCoordinator._heartbeat_routine()
Describe the bug
I have several hundred consumers all connected to the same singular topic with autocommit disabled. Each consumer is part of a unique individual group. When I call AIOKafkaConsumer.stop() on each of the consumers within a for loop the task doing the stopping eventually halts completely. When I probe into the task I find that it is stuck waiting on GroupCoordinator._heartbeat_routine() and it remains this way indefinitely.
<Task pending name='Task-22' coro=<consumer_stopper() running at /bar/foo.py: 692> wait_for=<Task pending name='Task-54392' coro=<GroupCoordinator._heartbeat_routine() running at /usr/local/lib/python3.8/dist-packages/aiokafka/consumer/group_coordinator.py: 724>
# It also seems to happen with GroupCoordinator._commit_refresh_routine(), GroupCoordinator._commit_refresh_task and AIOKafkaClient._sync_task,
Expected behaviour
GroupCoordinator._heartbeat_routine() should not prevent AIOKafkaConsumer.stop() from completing.
Environment (please complete the following information):
- aiokafka version (
python -c "import aiokafka; print(aiokafka.__version__)"): 0.7.1 - kafka-python version (
python -c "import kafka; print(kafka.__version__)"): 2.0.2 - Kafka Broker version (
kafka-topics.sh --version): 2.6.0 - Other information (Confluent Cloud version, etc.): Python 3.8.10
Reproducible example
I think it may be happening when stop() is called after a rebalance and that consumer might not be a part of the group anymore but I'm not sure. I'm trying to write a script that will recreate it reliably
This code reproduces the issue. I've been able to make it happen on Python 3.8.10 and 3.9.6 but it does not occur for Python 3.8.5. This was running on a Ubuntu 20.04 Docker container.
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import json
import random
EVENTS_PER_SECOND = 25
async def producer_task():
producer = AIOKafkaProducer(
loop=asyncio.get_running_loop(),
bootstrap_servers="<bootstrap-servers>",
)
data = json.dumps(
{
"foobar": "test",
"barfoo": "testing",
"raboof": "1388383131",
"fraboo": "paaonsasj",
"39u13913u31": "931uf0ih91f3",
"9fh-131f3f13f": {"timestamp": "2021-08-22T22:58:18.654101Z"},
},
indent=2,
).encode("utf-8")
await producer.start()
count = 0
# The more events produced, the more likely we'll hang
while True:
await producer.send_and_wait("<topic>", data)
await asyncio.sleep(1.0 / EVENTS_PER_SECOND)
count += 1
def display_task_statuses(ltasks):
d = {"pending": 0, "stopped": 0}
for _, task in ltasks:
if not task.done():
d["pending"] += 1
else:
d["stopped"] += 1
print("CONSUMER TASKS STATUSES", d)
async def consumer_task(consumer: AIOKafkaConsumer, all_consumers: list):
count = 0
while True:
try:
msg = await consumer.getone()
# print(f"{consumer._group_id} Got message #{msg.offset}")
await consumer.commit()
display_task_statuses(all_consumers)
count += 1
except asyncio.CancelledError:
raise
async def start_and_stop_consumers():
consumers = []
loop = asyncio.get_running_loop()
loop.create_task(producer_task())
# allow for some events to be produced
await asyncio.sleep(5.0)
# the more the consumers the more likely consumer.stop() will hang
for i in range(200):
consumer = AIOKafkaConsumer(
loop=loop,
bootstrap_servers="<bootstrap-servers>",
group_id=f"our-consumer-group-{i}",
enable_auto_commit=False,
max_poll_interval_ms=2590000,
)
consumer.subscribe(["<topic>"])
await consumer.start()
await consumer.seek_to_end()
consumers.append(
(
consumer,
loop.create_task(
consumer_task(consumer, consumers), name=f"our-consumer-{i}-task"
),
)
)
print(f"Started consumer {i}")
for consumer, task in consumers:
# We expect to stop() hang for a random consumer after a few minutes and hang
# indefinitely on a random task.
# Keep watch of pending vs stopped consumer tasks being printed, `stopped` will stop increasing.
await consumer.stop()
task.cancel()
print(f"Stopped consumer {consumer._group_id}")
# if everything works as in CPython 3.8.5 we should see the last printed
# line read "Stopped consumer our-consumer-group-199"
if __name__ == "__main__":
asyncio.run(start_and_stop_consumers())
I've been able to make it happen on Python 3.8.10 and 3.9.6 but it does not occur for Python 3.8.5.
This behaviour suggests that it could be related to the bug in wait_for introduced in Python 3.8.6.
Wow. This sounds consistent with the behaviour I've been seeing around other wait_for calls I tried to make. Besides downgrading the Python version, is there any other patch you'd recommend in the meanwhile?
It's possible to monkey-patch wait_for with the implementation prior to this bug, which had another bug. But that means you just have to choose from these two bugs which one beats you less. The problem is that for the current architecture of asyncio there is no visible way to fix this without breaking something else. And there is even no enough interest from the community to start discussion on what we can do. By leaving a comment there you might raise a change to put it on the agenda a bit. Until then I'd rather suggest to not rely on tasks in Python to be cancellable at all. Doesn't stop in reasonable time? Just kill it.
Thanks. The biggest blocker is actually the AIOKafkaConsumer() stop call and having unreleased resources left over. I've mentioned that here https://github.com/python/cpython/pull/26097#issuecomment-903792466. Thank you for your suggestion around monkey-patching!