faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Bug: Multiple (nested) StreamRouters are not initialized

Open andrewduckett opened this issue 1 year ago • 6 comments

Describe the bug

Hey @Lancetnik I'm having trouble when using multiple nested StreamRouters in FastAPI. I tried following the example here with no luck https://github.com/airtai/faststream/discussions/677#discussioncomment-7072337_

The application starts just fine but when accessing the broker to publish a message I get an error that the I need to connect first.

In the discussion is sounds like a single broker connection will be shared between all of the stream routers, which is what I would like to happen. I assume this has something to do with the lifecycle of each StreamRouter.

And/Or steps to reproduce the behavior:

  1. Create multiple StreamRouters (RabbitRouter in my case)
  2. Include those in either a StreamRouter or APIRouter as a core_router
  3. Include the core_router in FastAPI
  4. In one of the stream router methods, use router.broker.publish(...)

Expected behavior I would like to create multiple RabbitRouters, nested in an APIRouter, included in a FastAPI Application that share a connection.

Observed behavior

AssertionError: Please, `connect()` the broker first.

Screenshots If applicable, attach screenshots to help illustrate the problem.

Environment

Running FastStream 0.5.25 with CPython 3.12.1 on Darwin

FastAPI version 0.115.0

andrewduckett avatar Oct 10 '24 02:10 andrewduckett

Did you checked our dicumentation? https://faststream.airt.ai/latest/getting-started/integrations/fastapi/#multiple-routers

It's been a long time since this discussion. FastAPI and FastStream had some breaking changes

Now I reccomend to use our regular FastStream routers as nested for FastAPI one - it is the most predictable behavior

Lancetnik avatar Oct 10 '24 08:10 Lancetnik

Yes, I have tried that doc as well. Am I supposed to provide connection details to all RabbitRouters? or just the "core"? Do they still share a connection?

andrewduckett avatar Oct 10 '24 18:10 andrewduckett

If you are using our Router (not FastAPI-compatible ones) the core Router copies all publishers and subscribers to itself and nested Routers doesn't require a real connection

Lancetnik avatar Oct 11 '24 05:10 Lancetnik

Anyway, can you show me reproducible example I can copy-past?

Lancetnik avatar Oct 11 '24 05:10 Lancetnik

This is an example of what is not working. It seems as though the nested (a_router and b_router) do not go through lifespan correctly (or at all) and as a result .publish errors with AssertionError: Please, 'connect()' the broker first.

# consolidated ...
from fastapi import FastAPI, APIRouter
from faststream.rabbit.fastapi import RabbitRouter

# ./a/router.py

a_router = RabbitRouter()

@a_router.post("/foo")
async def foo():
    await a_router.broker.publish("foo")


# ./b/router.py

b_router = RabbitRouter()

# ./api.py

api_router = APIRouter()
api_router.include_router(a_router)
api_router.include_router(b_router)

# ./main.py

def create_app() -> FastAPI:
    app = FastAPI()
    app.include_router(api_router)
    return app

app = create_app()

Note, if I include a_router and b_router directly in FastAPI the broker connects just fine.

# ./main.py

def create_app() -> FastAPI:
    app = FastAPI()
    app.include_router(a_router)
    app.include_router(b_router)
    return app

app = create_app()

It's not what I expected, but it's not a deal breaker if that's how it works.

andrewduckett avatar Oct 11 '24 15:10 andrewduckett

@andrewduckett please, use faststream.rabbit.RabbitRouter instead of faststream.rabbit.fastapi.RabbitRouter for the nested ones

This is the recommended in the documentation way. But, I'll try to investigate your problem and fix it as well.

Lancetnik avatar Oct 11 '24 19:10 Lancetnik

@andrewduckett your example

# consolidated ...
from fastapi import FastAPI, APIRouter
from faststream.rabbit.fastapi import RabbitRouter

# ./a/router.py

a_router = RabbitRouter()

@a_router.post("/foo")
async def foo():
    await a_router.broker.publish("foo")


# ./b/router.py

b_router = RabbitRouter()

# ./api.py

api_router = APIRouter()
api_router.include_router(a_router)
api_router.include_router(b_router)

Shouldn't work. Such routres were designed to have the single broker and connection inside the main router. Also, you shouldn't get access to broker via router.broker in such cases, please, deliver publisher to enpoint via dia, your example is just an incorrect API usage

I think, we can close this Issue due current API has no this problem. Anyway, thank you for the report and feel free to reopen Issue if you have something to disccuss

Lancetnik avatar Oct 29 '24 18:10 Lancetnik