aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

`assert assignment is not None and assignment.active` raising incorrectly

Open cdgriffith opened this issue 4 years ago • 6 comments
trafficstars

Describe the bug I have a program that runs two listener consumers that are told when to start command, and alert when one is finished.

The start command consumer will launch new consumers per new topic command as needed. However this seems to raise assertion errors.

If I remove said assertion line the program runs as expected. Am I missing a core concept of how this is supported to work or is this an error?

Thanks for any help or pointers!

Errors Logs
```
INFO:     Started server process [11818]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:3333 (Press CTRL+C to quit)
2021-11-17 15:15:21.502 | DEBUG    | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=2, timestamp=1637183719944, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_
size=17, headers=())
2021-11-17 15:15:22.090 | DEBUG    | __main__:consume_message:132 - Reading test message at offset 0 : {TopicPartition(topic='my_topic_prefix.test.command', partition=0): [ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=0, timestamp=1637183621283, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=()), ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=1, timestamp=1637183642792, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialize
d_key_size=-1, serialized_value_size=17, headers=()), ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=2, timestamp=1637183719944, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]}
2021-11-17 15:15:22.091 | INFO     | __main__:consume_message:136 - {'topic': 'test'}
Unexpected error in coordinator routine
Traceback (most recent call last):
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 555, in _coordination_routine
    await self.__coordination_routine()
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 608, in __coordination_routine
    assert assignment is not None and assignment.active
AssertionError
2021-11-17 15:16:04.554 | DEBUG    | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=3, timestamp=1637183763013, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())
Task exception was never retrieved
future: <Task finished name='Task-212' coro=<consume_message() done, defined at test_fast.py:124> exception=KafkaError('Unexpected error during coordination AssertionError()')>
Traceback (most recent call last):
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 555, in _coordination_routine
    await self.__coordination_routine()
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 608, in __coordination_routine
    assert assignment is not None and assignment.active 
AssertionError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "test_fast.py", line 131, in consume_message
    msg = await consumer.getmany(timeout_ms=1000, max_records=5)
File "venv/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 1181, in getmany
    self._coordinator.check_errors()
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 300, in check_errors
    self._coordination_task.result()
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 564, in _coordination_routine
    raise kafka_exckafka.errors.KafkaError: KafkaError: Unexpected error during coordination AssertionError()
```

Expected behavior

Removing the assert makes the code run as expected:

Expected Logs
```

2021-11-17 15:46:05.227 | DEBUG    | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=0, timestamp=1637185563622, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())
2021-11-17 15:46:05.748 | DEBUG    | __main__:consume_message:137 - Reading test message at offset 0 : {TopicPartition(topic='my_topic_prefix.test.command', partition=0): [ConsumerRecord(topic=my_topic_prefix.test.command', partition=0, offset=0, timestamp=1637185563622, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]}
2021-11-17 15:46:05.748 | INFO     | __main__:consume_message:139 - High priority issue incoming
2021-11-17 15:46:05.749 | INFO     | __main__:consume_message:142 - {'topic': 'test'}
2021-11-17 15:46:10.145 | DEBUG    | __main__:finish_watcher:96 - Received completed topic: ConsumerRecord(topic='my_topic_prefix.finish', partition=0, offset=0, timestamp=1637185568621, timestamp_type=0, key=None, value=b'{"topic": "test", "offset": 0}', checksum=None, serialized_key_size=-1, serialized_value_size=30, headers=())
2021-11-17 15:46:11.177 | DEBUG    | __main__:consume_message:137 - Reading test message at offset 1 : {}
2021-11-17 15:46:11.177 | INFO     | __main__:consume_message:144 - No message, running background command instead
2021-11-17 15:46:20.154 | DEBUG    | __main__:finish_watcher:96 - Received completed topic: ConsumerRecord(topic='my_topic_prefix.finish', partition=0, offset=1, timestamp=1637185578633, timestamp_type=0, key=None, value=b'{"topic": "test", "offset": 0}', checksum=None, serialized_key_size=-1, serialized_value_size=30, headers=())
2021-11-17 15:46:20.208 | DEBUG    | __main__:consume_message:137 - Reading test message at offset 1 : {TopicPartition(topic='my_topic_prefix.command', partition=0): [ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=1, timestamp=1637185573620, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]}
2021-11-17 15:46:20.209 | INFO     | __main__:consume_message:139 - High priority issue incoming
2021-11-17 15:46:20.209 | INFO     | __main__:consume_message:142 - {'topic': 'test'}
```

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.7.2
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"): 2.0.2
  • Kafka Broker version (kafka-topics.sh --version): 2.8.0 (Commit:ebb1d6e21cc92130)

Reproducible example

Service
```python

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import asyncio
import json
import os

import motor.motor_asyncio
import aiokafka
from fastapi import FastAPI
from loguru import logger
import uvicorn
from dotenv import load_dotenv
from starlette.middleware.cors import CORSMiddleware
from starlette_context import context, plugins
from starlette_context.middleware import ContextMiddleware
from starlette.middleware import Middleware

load_dotenv()

topic_prefix = os.getenv('TOPIC_PREFIX', 'my_topic_prefix')
bootstrap_servers = os.getenv('KAFKA_BOOTSTRAP_SERVERS', "my_bootstrap_servers")
kafka_client_id = os.getenv("KAFKA_CLIENT_ID", "my_client_id")
mongo_uri = os.getenv("MONGO_URI", "my_mongo_uri")


middleware = [
    Middleware(
        ContextMiddleware,
        plugins=(
            plugins.RequestIdPlugin(),
            plugins.CorrelationIdPlugin()
        )
    )
]

app = FastAPI(debug=True, middleware=middleware)
app.add_middleware(CORSMiddleware, allow_origins=["*"])


@app.on_event("startup")
async def startup_event():
    context.consumers = {}
    context.shutdown_triggered = False

    context.launch_consumer = aiokafka.AIOKafkaConsumer(
        f"{topic_prefix}.launch",
        group_id="launch",
        client_id=f"{kafka_client_id}.launch",
        bootstrap_servers=bootstrap_servers,
        enable_auto_commit=True,
        auto_offset_reset="earliest",
    )
    await context.launch_consumer.start()

    context.finish_consumer = aiokafka.AIOKafkaConsumer(
        f"{topic_prefix}.finish",
        group_id="finish",
        client_id=f"{kafka_client_id}.finish",
        bootstrap_servers=bootstrap_servers,
        enable_auto_commit=True,
        auto_offset_reset="earliest",
    )
    await context.finish_consumer.start()

    asyncio.ensure_future(launch_watcher())
    asyncio.ensure_future(finish_watcher())

    context.mongo_client = motor.motor_asyncio.AsyncIOMotorClient(mongo_uri)
    context.mongo_db = context.mongo_client["mydb"]


@app.on_event("shutdown")
async def shutdown_event():
    context.shutdown_triggered = True
    for consumer in context.consumers.values():
        await consumer.stop()


async def launch_watcher():
    try:
        while not context.shutdown_triggered:
            async for command in context.launch_consumer:
                logger.debug(f"Received launch topic: {command}")
                parsed_topic_name = json.loads(command.value.decode("utf-8"))["topic"]
                asyncio.ensure_future(consume_message(parsed_topic_name))
    finally:
        await context.launch_consumer.stop()


async def finish_watcher():
    # While these do the same thing in this example
    # the context is they consume commands from different services when starting / ending a command
    try:
        while not context.shutdown_triggered:
            async for message in context.finish_consumer:
                logger.debug(f"Received completed topic: {message}")
                parsed_message = json.loads(message.value.decode("utf-8"))
                # Do something to stop / cancel the running command then get next one available
                await command_done(parsed_message["topic"], parsed_message["offset"])
                asyncio.ensure_future(consume_message(parsed_message["topic"]))
    finally:
            await context.finish_consumer.stop()


async def ensure_consumer_started(topic_name: str):
    if topic_name in context.consumers:
        return context.consumers[topic_name]

    consumer = aiokafka.AIOKafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        client_id=kafka_client_id,
        group_id=f"my_program_{topic_name}",
        auto_offset_reset="earliest",
        enable_auto_commit=False,
        metadata_max_age_ms=30000,
    )
    await consumer.start()
    tp = aiokafka.TopicPartition(f"{topic_prefix}.{topic_name}.command", 0)
    consumer.assign([tp])
    context.consumers[topic_name] = consumer
    return consumer


async def consume_message(topic_name: str):
    consumer = await ensure_consumer_started(topic_name)

    # Grab offset from database
    offset_data = await context.mongo_db.offsets.find_one({"topic_name": topic_name})
    offset = offset_data["offset"] if offset_data else 0

    # Get next message without blocking
    tp = aiokafka.TopicPartition(f"{topic_prefix}.{topic_name}.command", 0)
    consumer.assign([tp])
    consumer.seek(partition=tp, offset=offset)
    msg = await consumer.getmany(timeout_ms=1000, max_records=5)

    logger.debug(f"Reading {topic_name} message at offset {offset} : {msg}")
    if msg:
        logger.info("High priority issue incoming")
        raw_message = next(iter(msg.values()))[0]
        data = json.loads(raw_message.value.decode("utf-8"))
        logger.info(data)
    else:
        logger.info("No message, running background command instead")
        # Would run alt command instead with await


async def command_done(topic_name, offset):
    offset_data = await context.mongo_db.offsets.find_one({"topic_name": topic_name})
    new_offset = {"topic_name": topic_name, "offset": offset + 1}
    if offset_data:
        await context.mongo_db.offsets.replace_one({'_id': offset_data["_id"]}, new_offset)
    else:
        await context.mongo_db.offsets.insert_one(new_offset)


if __name__ == '__main__':
    uvicorn.run(app, port=3333, debug=True)
```
Test producer
```python

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import asyncio
import json
import os

import aiokafka
from dotenv import load_dotenv


load_dotenv()

topic_prefix = os.getenv('TOPIC_PREFIX', 'my_topic_prefix')
bootstrap_servers = os.getenv('KAFKA_BOOTSTRAP_SERVERS', "my_bootstrap_servers")
kafka_client_id = os.getenv("KAFKA_CLIENT_ID", "my_client_id")


async def main():
    producer = aiokafka.AIOKafkaProducer(
        bootstrap_servers=bootstrap_servers,
        client_id=kafka_client_id,
    )
    await producer.start()

    await producer.send(f'{topic_prefix}.test.command', json.dumps({"topic": "test"}).encode("utf-8"))
    await producer.send(f'{topic_prefix}.launch', json.dumps({"topic": "test"}).encode("utf-8"))

    await asyncio.sleep(5)

    await producer.send(f'{topic_prefix}.finish', json.dumps({"topic": "test", "offset": 0}).encode("utf-8"))

    await asyncio.sleep(5)

    await producer.send(f'{topic_prefix}.test.command', json.dumps({"topic": "test"}).encode("utf-8"))

    await asyncio.sleep(5)

    await producer.send(f'{topic_prefix}.finish', json.dumps({"topic": "test", "offset": 0}).encode("utf-8"))

    await producer.stop()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

```

cdgriffith avatar Nov 17 '21 22:11 cdgriffith

I was able to get past this for now by not setting the group_id for the consumer created in ensure_consumer_started.

cdgriffith avatar Nov 18 '21 14:11 cdgriffith

Thanks for reporting this with such details. Looking through the code I can verify that it's a bug and there is a race condition with manual assignment that will trigger the assignment.active is True assert. The assert should make sure the following code is consistent, but seems to only check for assignment changes in auto-assigned case. Will try to put together a fix.

tvoinarovskyi avatar Jan 02 '22 12:01 tvoinarovskyi

@tvoinarovskyi, is there any progress on fixing this issue?

vejmoj1 avatar Jun 22 '23 13:06 vejmoj1

@tvoinarovskyi Facing this same issue for a single consumer as well. What could be the issue here ?

AnimeshRy avatar Mar 13 '24 11:03 AnimeshRy

I've encountered this assertion exception too today, in my case it was a call to consumer.subscribe() after the consumer was started with await consumer.start(). Just thought I'd leave this here for anyone running into this issue, hope it helps someone.

marcelblijleven avatar Mar 21 '24 16:03 marcelblijleven