aiokafka
aiokafka copied to clipboard
`assert assignment is not None and assignment.active` raising incorrectly
Describe the bug I have a program that runs two listener consumers that are told when to start command, and alert when one is finished.
The start command consumer will launch new consumers per new topic command as needed. However this seems to raise assertion errors.
If I remove said assertion line the program runs as expected. Am I missing a core concept of how this is supported to work or is this an error?
Thanks for any help or pointers!
Errors Logs
```
INFO: Started server process [11818]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:3333 (Press CTRL+C to quit)
2021-11-17 15:15:21.502 | DEBUG | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=2, timestamp=1637183719944, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_
size=17, headers=())
2021-11-17 15:15:22.090 | DEBUG | __main__:consume_message:132 - Reading test message at offset 0 : {TopicPartition(topic='my_topic_prefix.test.command', partition=0): [ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=0, timestamp=1637183621283, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=()), ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=1, timestamp=1637183642792, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialize
d_key_size=-1, serialized_value_size=17, headers=()), ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=2, timestamp=1637183719944, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]}
2021-11-17 15:15:22.091 | INFO | __main__:consume_message:136 - {'topic': 'test'}
Unexpected error in coordinator routine
Traceback (most recent call last):
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 555, in _coordination_routine
await self.__coordination_routine()
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 608, in __coordination_routine
assert assignment is not None and assignment.active
AssertionError
2021-11-17 15:16:04.554 | DEBUG | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=3, timestamp=1637183763013, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())
Task exception was never retrieved
future: <Task finished name='Task-212' coro=<consume_message() done, defined at test_fast.py:124> exception=KafkaError('Unexpected error during coordination AssertionError()')>
Traceback (most recent call last):
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 555, in _coordination_routine
await self.__coordination_routine()
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 608, in __coordination_routine
assert assignment is not None and assignment.active
AssertionError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "test_fast.py", line 131, in consume_message
msg = await consumer.getmany(timeout_ms=1000, max_records=5)
File "venv/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 1181, in getmany
self._coordinator.check_errors()
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 300, in check_errors
self._coordination_task.result()
File "venv/lib/python3.10/site-packages/aiokafka/consumer/group_coordinator.py", line 564, in _coordination_routine
raise kafka_exckafka.errors.KafkaError: KafkaError: Unexpected error during coordination AssertionError()
```
Expected behavior
Removing the assert makes the code run as expected:
Expected Logs
```
2021-11-17 15:46:05.227 | DEBUG | __main__:launch_watcher:83 - Received launch topic: ConsumerRecord(topic='my_topic_prefix.launch', partition=0, offset=0, timestamp=1637185563622, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())
2021-11-17 15:46:05.748 | DEBUG | __main__:consume_message:137 - Reading test message at offset 0 : {TopicPartition(topic='my_topic_prefix.test.command', partition=0): [ConsumerRecord(topic=my_topic_prefix.test.command', partition=0, offset=0, timestamp=1637185563622, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]}
2021-11-17 15:46:05.748 | INFO | __main__:consume_message:139 - High priority issue incoming
2021-11-17 15:46:05.749 | INFO | __main__:consume_message:142 - {'topic': 'test'}
2021-11-17 15:46:10.145 | DEBUG | __main__:finish_watcher:96 - Received completed topic: ConsumerRecord(topic='my_topic_prefix.finish', partition=0, offset=0, timestamp=1637185568621, timestamp_type=0, key=None, value=b'{"topic": "test", "offset": 0}', checksum=None, serialized_key_size=-1, serialized_value_size=30, headers=())
2021-11-17 15:46:11.177 | DEBUG | __main__:consume_message:137 - Reading test message at offset 1 : {}
2021-11-17 15:46:11.177 | INFO | __main__:consume_message:144 - No message, running background command instead
2021-11-17 15:46:20.154 | DEBUG | __main__:finish_watcher:96 - Received completed topic: ConsumerRecord(topic='my_topic_prefix.finish', partition=0, offset=1, timestamp=1637185578633, timestamp_type=0, key=None, value=b'{"topic": "test", "offset": 0}', checksum=None, serialized_key_size=-1, serialized_value_size=30, headers=())
2021-11-17 15:46:20.208 | DEBUG | __main__:consume_message:137 - Reading test message at offset 1 : {TopicPartition(topic='my_topic_prefix.command', partition=0): [ConsumerRecord(topic='my_topic_prefix.test.command', partition=0, offset=1, timestamp=1637185573620, timestamp_type=0, key=None, value=b'{"topic": "test"}', checksum=None, serialized_key_size=-1, serialized_value_size=17, headers=())]}
2021-11-17 15:46:20.209 | INFO | __main__:consume_message:139 - High priority issue incoming
2021-11-17 15:46:20.209 | INFO | __main__:consume_message:142 - {'topic': 'test'}
```
Environment (please complete the following information):
- aiokafka version (
python -c "import aiokafka; print(aiokafka.__version__)"): 0.7.2 - kafka-python version (
python -c "import kafka; print(kafka.__version__)"): 2.0.2 - Kafka Broker version (
kafka-topics.sh --version): 2.8.0 (Commit:ebb1d6e21cc92130)
Reproducible example
Service
```python
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import asyncio
import json
import os
import motor.motor_asyncio
import aiokafka
from fastapi import FastAPI
from loguru import logger
import uvicorn
from dotenv import load_dotenv
from starlette.middleware.cors import CORSMiddleware
from starlette_context import context, plugins
from starlette_context.middleware import ContextMiddleware
from starlette.middleware import Middleware
load_dotenv()
topic_prefix = os.getenv('TOPIC_PREFIX', 'my_topic_prefix')
bootstrap_servers = os.getenv('KAFKA_BOOTSTRAP_SERVERS', "my_bootstrap_servers")
kafka_client_id = os.getenv("KAFKA_CLIENT_ID", "my_client_id")
mongo_uri = os.getenv("MONGO_URI", "my_mongo_uri")
middleware = [
Middleware(
ContextMiddleware,
plugins=(
plugins.RequestIdPlugin(),
plugins.CorrelationIdPlugin()
)
)
]
app = FastAPI(debug=True, middleware=middleware)
app.add_middleware(CORSMiddleware, allow_origins=["*"])
@app.on_event("startup")
async def startup_event():
context.consumers = {}
context.shutdown_triggered = False
context.launch_consumer = aiokafka.AIOKafkaConsumer(
f"{topic_prefix}.launch",
group_id="launch",
client_id=f"{kafka_client_id}.launch",
bootstrap_servers=bootstrap_servers,
enable_auto_commit=True,
auto_offset_reset="earliest",
)
await context.launch_consumer.start()
context.finish_consumer = aiokafka.AIOKafkaConsumer(
f"{topic_prefix}.finish",
group_id="finish",
client_id=f"{kafka_client_id}.finish",
bootstrap_servers=bootstrap_servers,
enable_auto_commit=True,
auto_offset_reset="earliest",
)
await context.finish_consumer.start()
asyncio.ensure_future(launch_watcher())
asyncio.ensure_future(finish_watcher())
context.mongo_client = motor.motor_asyncio.AsyncIOMotorClient(mongo_uri)
context.mongo_db = context.mongo_client["mydb"]
@app.on_event("shutdown")
async def shutdown_event():
context.shutdown_triggered = True
for consumer in context.consumers.values():
await consumer.stop()
async def launch_watcher():
try:
while not context.shutdown_triggered:
async for command in context.launch_consumer:
logger.debug(f"Received launch topic: {command}")
parsed_topic_name = json.loads(command.value.decode("utf-8"))["topic"]
asyncio.ensure_future(consume_message(parsed_topic_name))
finally:
await context.launch_consumer.stop()
async def finish_watcher():
# While these do the same thing in this example
# the context is they consume commands from different services when starting / ending a command
try:
while not context.shutdown_triggered:
async for message in context.finish_consumer:
logger.debug(f"Received completed topic: {message}")
parsed_message = json.loads(message.value.decode("utf-8"))
# Do something to stop / cancel the running command then get next one available
await command_done(parsed_message["topic"], parsed_message["offset"])
asyncio.ensure_future(consume_message(parsed_message["topic"]))
finally:
await context.finish_consumer.stop()
async def ensure_consumer_started(topic_name: str):
if topic_name in context.consumers:
return context.consumers[topic_name]
consumer = aiokafka.AIOKafkaConsumer(
bootstrap_servers=bootstrap_servers,
client_id=kafka_client_id,
group_id=f"my_program_{topic_name}",
auto_offset_reset="earliest",
enable_auto_commit=False,
metadata_max_age_ms=30000,
)
await consumer.start()
tp = aiokafka.TopicPartition(f"{topic_prefix}.{topic_name}.command", 0)
consumer.assign([tp])
context.consumers[topic_name] = consumer
return consumer
async def consume_message(topic_name: str):
consumer = await ensure_consumer_started(topic_name)
# Grab offset from database
offset_data = await context.mongo_db.offsets.find_one({"topic_name": topic_name})
offset = offset_data["offset"] if offset_data else 0
# Get next message without blocking
tp = aiokafka.TopicPartition(f"{topic_prefix}.{topic_name}.command", 0)
consumer.assign([tp])
consumer.seek(partition=tp, offset=offset)
msg = await consumer.getmany(timeout_ms=1000, max_records=5)
logger.debug(f"Reading {topic_name} message at offset {offset} : {msg}")
if msg:
logger.info("High priority issue incoming")
raw_message = next(iter(msg.values()))[0]
data = json.loads(raw_message.value.decode("utf-8"))
logger.info(data)
else:
logger.info("No message, running background command instead")
# Would run alt command instead with await
async def command_done(topic_name, offset):
offset_data = await context.mongo_db.offsets.find_one({"topic_name": topic_name})
new_offset = {"topic_name": topic_name, "offset": offset + 1}
if offset_data:
await context.mongo_db.offsets.replace_one({'_id': offset_data["_id"]}, new_offset)
else:
await context.mongo_db.offsets.insert_one(new_offset)
if __name__ == '__main__':
uvicorn.run(app, port=3333, debug=True)
```
Test producer
```python
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import asyncio
import json
import os
import aiokafka
from dotenv import load_dotenv
load_dotenv()
topic_prefix = os.getenv('TOPIC_PREFIX', 'my_topic_prefix')
bootstrap_servers = os.getenv('KAFKA_BOOTSTRAP_SERVERS', "my_bootstrap_servers")
kafka_client_id = os.getenv("KAFKA_CLIENT_ID", "my_client_id")
async def main():
producer = aiokafka.AIOKafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id=kafka_client_id,
)
await producer.start()
await producer.send(f'{topic_prefix}.test.command', json.dumps({"topic": "test"}).encode("utf-8"))
await producer.send(f'{topic_prefix}.launch', json.dumps({"topic": "test"}).encode("utf-8"))
await asyncio.sleep(5)
await producer.send(f'{topic_prefix}.finish', json.dumps({"topic": "test", "offset": 0}).encode("utf-8"))
await asyncio.sleep(5)
await producer.send(f'{topic_prefix}.test.command', json.dumps({"topic": "test"}).encode("utf-8"))
await asyncio.sleep(5)
await producer.send(f'{topic_prefix}.finish', json.dumps({"topic": "test", "offset": 0}).encode("utf-8"))
await producer.stop()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
```
I was able to get past this for now by not setting the group_id for the consumer created in ensure_consumer_started.
Thanks for reporting this with such details. Looking through the code I can verify that it's a bug and there is a race condition with manual assignment that will trigger the assignment.active is True assert. The assert should make sure the following code is consistent, but seems to only check for assignment changes in auto-assigned case. Will try to put together a fix.
@tvoinarovskyi, is there any progress on fixing this issue?
@tvoinarovskyi Facing this same issue for a single consumer as well. What could be the issue here ?
I've encountered this assertion exception too today, in my case it was a call to consumer.subscribe() after the consumer was started with await consumer.start(). Just thought I'd leave this here for anyone running into this issue, hope it helps someone.