faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Bug: nested FastAPI routers duplicate subscribers' middlewares

Open ALittleMoron opened this issue 1 year ago • 4 comments

Describe the bug According to your documentation https://faststream.airt.ai/latest/getting-started/integrations/fastapi/#routers-nesting, we wanted to create fastapi-like routers structure to separate subscribers for different topics:

├── kafka
├── __init__.py
├── topic1
│  ├── __init__.py
│  └── subscribers.py
├── topic2
│  ├── __init__.py
│  └── publishers.py
├── topic3
│  ├── __init__.py
│  └── subscribers.py
└── core_router.py

But we faced with problem, that nested routers duplicates consumer instances, which cause multiple reading and processing one message from topic:

test-faststream-py3.11➜  test-faststream uvicorn main:app
INFO:     Started server process [53140]
INFO:     Waiting for application startup.
2024-08-30 11:48:03,740 INFO     - test |            - `Hello` waiting for messages
2024-08-30 11:48:03,749 INFO     - test |            - `Hello` waiting for messages
2024-08-30 11:48:03,749 INFO     - test |            - `Hello` waiting for messages

and cause aiokafka error on shutdown:

Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7b93128ff590>

How to reproduce Include source code:

from fastapi import FastAPI
from faststream.kafka.fastapi import KafkaRouter, Logger
from pydantic import BaseModel

kafka_router = KafkaRouter("localhost:9092")


class Incoming(BaseModel):
    m: dict


inner_kafka = KafkaRouter()


@inner_kafka.subscriber("test")
@inner_kafka.publisher("response")
async def hello(m: Incoming, logger: Logger):
    logger.info(m)
    return {"response": "Hello, Kafka!"}


app = FastAPI()
kafka_router.include_router(inner_kafka)
app.include_router(kafka_router)

And/Or steps to reproduce the behavior:

  1. Start application
  2. Create message for topic "test"
  3. See logs

Expected behavior

test-faststream-py3.11➜  test-faststream uvicorn main:app
INFO:     Started server process [56711]
INFO:     Waiting for application startup.
2024-08-30 12:07:48,342 INFO     - test |            - `Hello` waiting for messages
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
2024-08-30 12:07:55,098 INFO     - test | 0-17250088 - Received
2024-08-30 12:07:55,101 INFO     - test | 0-17250088 - m={'abc': 25}
2024-08-30 12:07:55,101 INFO     - test | 0-17250088 - Processed

Observed behavior

test-faststream-py3.11➜  test-faststream uvicorn main:app
INFO:     Started server process [56100]
INFO:     Waiting for application startup.
2024-08-30 12:06:14,196 INFO     - test |            - `Hello` waiting for messages
2024-08-30 12:06:14,204 INFO     - test |            - `Hello` waiting for messages
2024-08-30 12:06:14,204 INFO     - test |            - `Hello` waiting for messages
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x778c46fbf650>
2024-08-30 12:06:16,150 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,150 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,153 INFO     - test | 0-17250087 - Received
2024-08-30 12:06:16,154 INFO     - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,154 INFO     - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - m={'abc': 25}
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed
2024-08-30 12:06:16,155 INFO     - test | 0-17250087 - Processed

Environment Running FastStream 0.5.19 with CPython 3.12.5 on Linux

[tool.poetry.dependencies]
python = "^3.11"
faststream = {extras = ["kafka"], version = "^0.5.19"}
fastapi = "^0.112.2"
uvicorn = "^0.30.6"

ALittleMoron avatar Aug 30 '24 09:08 ALittleMoron

Hi! Thank you for the report The problem is related to latest FastAPI 0.112.2 release I fixes it already in latest commit, please wait for 0.5.20 FastStream release and check it again (I plan to release it this evening)

Lancetnik avatar Aug 30 '24 09:08 Lancetnik

The problem was fixed in 0.5.20 (now nested routers doesn't run inner broker), but the problem with Middlewares duplication still exists.

I schedule solution on 0.6.0

Since 0.5.21 release, please use regular BrokerRouter as nested for FastAPI integration one

Lancetnik avatar Aug 30 '24 17:08 Lancetnik

The problem was fixed in 0.5.20 (now nested routers doesn't run inner broker), but the problem with Middlewares duplication still exists.

I schedule solution on 0.6.0

Since 0.5.21 release, please use regular BrokerRouter as nested for FastAPI integration one

Could you please provide a more detailed explanation of the solution? How can BrokerRouter be used instead of NatsRouter? (fastapi)

and-sm avatar Dec 16 '24 16:12 and-sm

How can BrokerRouter be used instead of NatsRouter?

https://faststream.airt.ai/latest/getting-started/integrations/fastapi/#multiple-routers

Lancetnik avatar Dec 16 '24 18:12 Lancetnik

Faced the same problem with regular RabbitRouter

geth-network avatar Jun 11 '25 20:06 geth-network

@geth-network can you show an MRE? We have a test for it, so I think, that error in other place

Lancetnik avatar Jun 12 '25 20:06 Lancetnik

@geth-network sorry, but I can't reproducer the problem

from faststream import BaseMiddleware, FastStream
from faststream.rabbit import RabbitBroker, RabbitRouter

class Mid(BaseMiddleware):
    async def on_consume(self, msg):
        print("called")
        return await super().on_consume(msg)

nested_router = RabbitRouter(middlewares=[Mid])

@nested_router.subscriber("test1")
async def handler(): ...

router = RabbitRouter(middlewares=[Mid])
router.include_router(nested_router)

broker = RabbitBroker(middlewares=[Mid])
broker.include_router(router)

app = FastStream(broker)

@app.after_startup
async def _():
    await broker.publish(0, "test1")

In the code above, the middleware was called three times, which is correct. The middleware stack looks like this: (<faststream.broker.middlewares.logging.CriticalLogMiddleware object at 0x103739400>, <class 'serve.Mid'>, <class 'serve.Mid'>, <class 'serve.Mid'>)

Lancetnik avatar Jun 15 '25 14:06 Lancetnik

@Lancetnik Hi, sorry for the late reply. Most likely, the issue I faced earlier is caused by the fact that, for some reason, I initialized the broker multiple times during testing session, which eventually led to a RecursionError. I solved the problem by moving the broker initialization to the package scope - where the Rabbit setup for the tests takes place.

I assume such usage is not intended, and in that case, I would suggest either raising an error or at least log a warning to make it easier to understand what caused the unexpected behavior.

Here is the code that reproduces the issue:

import pytest
import pytest_asyncio

from faststream import BaseMiddleware
from faststream.rabbit import RabbitBroker, RabbitRouter, TestRabbitBroker


class Mid(BaseMiddleware):
    async def on_consume(self, msg):
        print(f"called: {msg.body}")
        return await super().on_consume(msg)

router = RabbitRouter()

@router.subscriber("test1")
async def handler(): ...


def init_broker(url):
    broker = RabbitBroker(url=url, middlewares=[Mid])
    broker.include_router(router)
    return broker


@pytest_asyncio.fixture
async def broker():
    # some stuff with monkeypatching
    # 
    different_url_for_test = "amqp://localhost:5672/"
    broker = init_broker(different_url_for_test)
    async with TestRabbitBroker(broker, with_real=True) as br:
        yield br


@pytest.mark.asyncio
@pytest.mark.parametrize("i", range(10))
async def test_something(i, broker):
    await broker.publish(i, "test1")  # you'll have "called: b'X'" X+1 times


geth-network avatar Jun 18 '25 20:06 geth-network