Bug: nested FastAPI routers duplicate subscribers' middlewares
Describe the bug According to your documentation https://faststream.airt.ai/latest/getting-started/integrations/fastapi/#routers-nesting, we wanted to create fastapi-like routers structure to separate subscribers for different topics:
├── kafka
├── __init__.py
├── topic1
│ ├── __init__.py
│ └── subscribers.py
├── topic2
│ ├── __init__.py
│ └── publishers.py
├── topic3
│ ├── __init__.py
│ └── subscribers.py
└── core_router.py
But we faced with problem, that nested routers duplicates consumer instances, which cause multiple reading and processing one message from topic:
test-faststream-py3.11➜ test-faststream uvicorn main:app
INFO: Started server process [53140]
INFO: Waiting for application startup.
2024-08-30 11:48:03,740 INFO - test | - `Hello` waiting for messages
2024-08-30 11:48:03,749 INFO - test | - `Hello` waiting for messages
2024-08-30 11:48:03,749 INFO - test | - `Hello` waiting for messages
and cause aiokafka error on shutdown:
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7b93128ff590>
How to reproduce Include source code:
from fastapi import FastAPI
from faststream.kafka.fastapi import KafkaRouter, Logger
from pydantic import BaseModel
kafka_router = KafkaRouter("localhost:9092")
class Incoming(BaseModel):
m: dict
inner_kafka = KafkaRouter()
@inner_kafka.subscriber("test")
@inner_kafka.publisher("response")
async def hello(m: Incoming, logger: Logger):
logger.info(m)
return {"response": "Hello, Kafka!"}
app = FastAPI()
kafka_router.include_router(inner_kafka)
app.include_router(kafka_router)
And/Or steps to reproduce the behavior:
- Start application
- Create message for topic "test"
- See logs
Expected behavior
test-faststream-py3.11➜ test-faststream uvicorn main:app
INFO: Started server process [56711]
INFO: Waiting for application startup.
2024-08-30 12:07:48,342 INFO - test | - `Hello` waiting for messages
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
2024-08-30 12:07:55,098 INFO - test | 0-17250088 - Received
2024-08-30 12:07:55,101 INFO - test | 0-17250088 - m={'abc': 25}
2024-08-30 12:07:55,101 INFO - test | 0-17250088 - Processed
Observed behavior
test-faststream-py3.11➜ test-faststream uvicorn main:app
INFO: Started server process [56100]
INFO: Waiting for application startup.
2024-08-30 12:06:14,196 INFO - test | - `Hello` waiting for messages
2024-08-30 12:06:14,204 INFO - test | - `Hello` waiting for messages
2024-08-30 12:06:14,204 INFO - test | - `Hello` waiting for messages
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x778c46fbf650>
2024-08-30 12:06:16,150 INFO - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO - test | 0-17250087 - Received
2024-08-30 12:06:16,154 INFO - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,154 INFO - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO - test | 0-17250087 - Processed
Environment Running FastStream 0.5.19 with CPython 3.12.5 on Linux
[tool.poetry.dependencies]
python = "^3.11"
faststream = {extras = ["kafka"], version = "^0.5.19"}
fastapi = "^0.112.2"
uvicorn = "^0.30.6"
Hi! Thank you for the report The problem is related to latest FastAPI 0.112.2 release I fixes it already in latest commit, please wait for 0.5.20 FastStream release and check it again (I plan to release it this evening)
The problem was fixed in 0.5.20 (now nested routers doesn't run inner broker), but the problem with Middlewares duplication still exists.
I schedule solution on 0.6.0
Since 0.5.21 release, please use regular BrokerRouter as nested for FastAPI integration one
The problem was fixed in 0.5.20 (now nested routers doesn't run inner broker), but the problem with Middlewares duplication still exists.
I schedule solution on 0.6.0
Since 0.5.21 release, please use regular BrokerRouter as nested for FastAPI integration one
Could you please provide a more detailed explanation of the solution? How can BrokerRouter be used instead of NatsRouter? (fastapi)
How can BrokerRouter be used instead of NatsRouter?
https://faststream.airt.ai/latest/getting-started/integrations/fastapi/#multiple-routers
Faced the same problem with regular RabbitRouter
@geth-network can you show an MRE? We have a test for it, so I think, that error in other place
@geth-network sorry, but I can't reproducer the problem
from faststream import BaseMiddleware, FastStream
from faststream.rabbit import RabbitBroker, RabbitRouter
class Mid(BaseMiddleware):
async def on_consume(self, msg):
print("called")
return await super().on_consume(msg)
nested_router = RabbitRouter(middlewares=[Mid])
@nested_router.subscriber("test1")
async def handler(): ...
router = RabbitRouter(middlewares=[Mid])
router.include_router(nested_router)
broker = RabbitBroker(middlewares=[Mid])
broker.include_router(router)
app = FastStream(broker)
@app.after_startup
async def _():
await broker.publish(0, "test1")
In the code above, the middleware was called three times, which is correct.
The middleware stack looks like this: (<faststream.broker.middlewares.logging.CriticalLogMiddleware object at 0x103739400>, <class 'serve.Mid'>, <class 'serve.Mid'>, <class 'serve.Mid'>)
@Lancetnik
Hi, sorry for the late reply. Most likely, the issue I faced earlier is caused by the fact that, for some reason, I initialized the broker multiple times during testing session, which eventually led to a RecursionError. I solved the problem by moving the broker initialization to the package scope - where the Rabbit setup for the tests takes place.
I assume such usage is not intended, and in that case, I would suggest either raising an error or at least log a warning to make it easier to understand what caused the unexpected behavior.
Here is the code that reproduces the issue:
import pytest
import pytest_asyncio
from faststream import BaseMiddleware
from faststream.rabbit import RabbitBroker, RabbitRouter, TestRabbitBroker
class Mid(BaseMiddleware):
async def on_consume(self, msg):
print(f"called: {msg.body}")
return await super().on_consume(msg)
router = RabbitRouter()
@router.subscriber("test1")
async def handler(): ...
def init_broker(url):
broker = RabbitBroker(url=url, middlewares=[Mid])
broker.include_router(router)
return broker
@pytest_asyncio.fixture
async def broker():
# some stuff with monkeypatching
#
different_url_for_test = "amqp://localhost:5672/"
broker = init_broker(different_url_for_test)
async with TestRabbitBroker(broker, with_real=True) as br:
yield br
@pytest.mark.asyncio
@pytest.mark.parametrize("i", range(10))
async def test_something(i, broker):
await broker.publish(i, "test1") # you'll have "called: b'X'" X+1 times