fastapi icon indicating copy to clipboard operation
fastapi copied to clipboard

Context managers in `Depends` are broken after 0.106

Open Kludex opened this issue 1 year ago • 31 comments

Discussed in https://github.com/tiangolo/fastapi/discussions/11107

Originally posted by FeeeeK February 7, 2024

First Check

  • [X] I added a very descriptive title here.
  • [X] I used the GitHub search to find a similar question and didn't find it.
  • [X] I searched the FastAPI documentation, with the integrated search.
  • [X] I already searched in Google "How to X in FastAPI" and didn't find any information.
  • [X] I already read and followed all the tutorial in the docs and didn't find an answer.
  • [X] I already checked if it is not related to FastAPI but to Pydantic.
  • [X] I already checked if it is not related to FastAPI but to Swagger UI.
  • [X] I already checked if it is not related to FastAPI but to ReDoc.

Commit to Help

  • [X] I commit to help with one of those options 👆

Example Code

from fastapi import Depends, FastAPI, Request

app = FastAPI()


class Session:
    def __init__(self):
        print("creating session")

    async def __aenter__(self):
        print("opening session")
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("closing session")

    async def commit(self):
        print("committing session")

    async def rollback(self):
        print("rolling back session")


@app.middleware("http")
async def commit_session(request: Request, call_next):
    # minimalistic middleware for example, my code uses ASGI middleware
    response = await call_next(request)
    db_session = request.scope.get("db_session")
    if not db_session:
        return response

    if response.status_code // 200 != 1:
        await db_session.rollback()
    else:
        await db_session.commit()

    return response


async def get_db_session(request: Request):
    async with Session() as session:
        request.scope["db_session"] = session
        yield session


@app.get("/")
async def root(session: Session = Depends(get_db_session)):
    return {"message": "Hello World"}


# Pre 0.106 behaviour:

# creating session
# opening session
# committing session
# closing session

# Post 0.106 behaviour:
# creating session
# opening session
# closing session
# committing session

# The session is not committed, because it's closed before the middleware is called.

Description

Before 0.106, Depends execution after yield was after middlewares, which allowed to access resources created for a route (e.g. sessions) and do something with them depending on the response (which cannot be done with Depends), but after 0.106, the behavior has changed and this feature is no longer available. The documentation only talks about background tasks, but not a word about middlewares. Was this behavior change intentional?

Operating System

Windows

Operating System Details

No response

FastAPI Version

0.109.2

Pydantic Version

2.4.2

Python Version

3.11.1

Additional Context

No response

Kludex avatar Feb 14 '24 09:02 Kludex

I commented in the Discussion that StreamingResponse's no longer work with Depends resources. Since this is a pretty large change in behavior, and wasn't called out in the release notes, I wonder if this is intended. Should this be a separate Issue?

falkben avatar Feb 14 '24 15:02 falkben

I commented in the Discussion that StreamingResponse's no longer work with Depends resources. Since this is a pretty large change in behavior, and wasn't called out in the release notes, I wonder if this is intended. Should this be a separate Issue?

No. The only reason I created this issue is because of your comment. 🙏

Kludex avatar Feb 14 '24 15:02 Kludex

Does this comment by @tiangolo answers this issue: https://github.com/tiangolo/fastapi/discussions/11177#discussioncomment-8559300 ?

Kludex avatar Feb 22 '24 19:02 Kludex

I might be misinterpreting but I don't think so? Seems like it was intended to disable using resources in background tasks. But was it intended to also disable using resources in path operators with StreamingResponses?

falkben avatar Feb 22 '24 19:02 falkben

It seems this is what breaks our streaming endpoints, the lack of which kills throughput making everything slow.

mortalisk avatar Feb 29 '24 08:02 mortalisk

A workaround for now is to use our connection pool directly in each endpoint doing streaming. Annoying, but works. Doing something like this for a psycopg pool:

@router.get("/stuff", response_model=list[Stuff])
async def get_all_stuff(pool: PoolDep):
    connection = await pool.getconn()

    async def commit_and_put_in_pool():
        await connection.commit()
        await pool.putconn(connection)

    return StreamingResponse(
            stream_stuff(connection), media_type="application/json", background=commit_and_put_in_pool
    )

This seems to work for us. But is it correct? Will the background task always run after the streaming is done? Or can we end up in a situation where it is run in the middle of streaming?

mortalisk avatar Mar 04 '24 08:03 mortalisk

Our streaming endpoints are broken probably due to this. So it seems that there is no good workaround as long as a db session is managed with Depends?

brian-goo avatar Mar 14 '24 02:03 brian-goo

Our streaming endpoints are broken probably due to this. So it seems that there is no good workaround as long as a db session is managed with Depends?

The release notes suggest the following:

If you used to rely on this behavior, now you should create the resources for background tasks inside the background task itself, and use internally only data that doesn't depend on the resources of dependencies with yield.

For example, instead of using the same database session, you would create a new database session inside of the background task, and you would obtain the objects from the database using this new session. And then instead of passing the object from the database as a parameter to the background task function, you would pass the ID of that object and then obtain the object again inside the background task function.

i.e. just don't use Depends for obtaining your db session for background tasks any more but obtain it within the background task

scriptator avatar Mar 21 '24 06:03 scriptator

@scriptator we're not talking about background tasks here.

falkben avatar Mar 21 '24 15:03 falkben

@scriptator we're not talking about background tasks here.

I was replying to @brian-goo who talked about streaming responses which behave like background tasks in this regard. I had a problem because I used a db session in a streaming response that I obtained with Depends and it was fixed by following the advice for background tasks: initiate your resources within the background task (or generator function in the case of streaming responses).

But you are right if you meant that this does not contribute to the original problem from the issue.

scriptator avatar Mar 21 '24 16:03 scriptator

I don't think streaming responses should be considered similar to background tasks. They are part of the request/response cycle. I think their behavior in this case being similar to background tasks not being able to use dependency injection in the path operators is likely a bug.

falkben avatar Mar 21 '24 18:03 falkben

Any progress on this? Our workaround is not working as expected. It seems the background task is not guaranteed to run, and thus our connection pool is failing when ever there is any error, essentially taking down the entire program, and we must restart.

This basically renders streaming useless for us.

Is there something else we can do to make it work?

mortalisk avatar Apr 11 '24 14:04 mortalisk

I am facing the same problem. Does anybody know any workaround for this problem? @tiangolo #11444

hasansustcse13 avatar Apr 16 '24 08:04 hasansustcse13

For now, just staying on version <0.106.

falkben avatar Apr 16 '24 13:04 falkben

Does <0.106 work on python 3.12? Because 0.109 says "Add support for Python 3.12."

mortalisk avatar Apr 16 '24 13:04 mortalisk

+1 for this issue. It breaks request context management for stream proxies.

ie I have a Depends parameter for connecting to an external streaming API. I start a stream from an external API in the route. I need the initial response from the external API to contribute to the StreamResponse return. Then I need that same connection to stream from my streaming generator.

The work around in my case is to use a global client for the external streaming API so that cleanup is not called on the return of StreamingResponse. As well as managing the context with try/finally blocks in the route and streaming generator. Then more context managers in the streaming generator for other Depends that die when the route returns.

btemplep avatar Apr 21 '24 23:04 btemplep

@btemplep could you provide a code example for your workaround?

libe avatar May 06 '24 12:05 libe

We also encountered issues with streaming response after upgrading FastAPI to > 0.106.

We implemented Git service in HTTP protocol. During the time streaming response is generating content to the client, it needs to access to database session and other services provided bye Depends. While the rationale of the change is understandable, but streaming response is very different from background task. As the moment we are streaming the response, the request is still alive. The streaming content could still dynamically changes based on the underlying resources or connections to database or other services. So at the moment of streaming content, we still need to access the dependencies. Because if we have all the response content ready when returning the route function, we won't need to use streaming response anyway, as it defeats its own purpose.

Is it possible to delay the destruction of dependencies only after the HTTP session actually closed (no more content will be sent to the client)? Or make it a special case if the returned response is a streaming response?

fangpenlin avatar May 12 '24 00:05 fangpenlin

@libe Here is an example of what I expected to work

from typing import Any, AsyncGenerator, Annotated

import aioboto3
from fastapi import Depends, FastAPI
from fastapi.responses import StreamingResponse

from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

engine = create_async_engine("sqlite+aiosqlite:///test.sqlite")
sessionmaker: async_sessionmaker[AsyncSession] = async_sessionmaker(expire_on_commit=False)


class Base(AsyncAttrs, DeclarativeBase):
    pass


class SomeTable(Base):
    __tablename__ = "some_table"

    some_id: Mapped[int] = mapped_column(primary_key=True, nullable=False)


async def get_db() -> AsyncGenerator[AsyncSession, None, None]:
    async with sessionmaker() as sess:
        yield sess


async def get_boto_client():
    async with aioboto3.Session().client("bedrock-runtime") as br_client:
        yield br_client


app = FastAPI()


async def _stream_resp(db_sess: AsyncSession, stream_resp: dict):
    async for chunk in stream_resp['body']:
        # also do something with the db_session
        yield chunk


@app.route("/route_here")
async def route_here(
    db_sess: Annotated[AsyncSession, Depends(get_db)],
    br_client: Annotated[Any, Depends(get_boto_client)]
):
    stream_resp = br_client.invoke_model_with_response_stream(args="here")
    return StreamingResponse(
        _stream_resp(
            db_sess=db_sess,
            stream_resp=stream_resp
        )
    )

Here is a simplified version of what my fix is.

from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Annotated

import aioboto3
from fastapi import Depends, FastAPI
from fastapi.responses import StreamingResponse

from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

engine = create_async_engine("sqlite+aiosqlite:///test.sqlite")
sessionmaker: async_sessionmaker[AsyncSession] = async_sessionmaker(expire_on_commit=False)
br_client = None


class Base(AsyncAttrs, DeclarativeBase):
    pass


class SomeTable(Base):
    __tablename__ = "some_table"

    some_id: Mapped[int] = mapped_column(primary_key=True, nullable=False)


async def get_db() -> AsyncGenerator[AsyncSession, None, None]:
    async with sessionmaker() as sess:
        yield sess


@asynccontextmanager
async def lifespan(app: FastAPI):
    global br_client
    async with aioboto3.Session().client("bedrock-runtime") as br:
        br_client = br
        yield


app = FastAPI(
    lifespan=lifespan
)


async def _stream_resp(stream_resp: dict):
    # In the streaming generator we need to start our own context manager for DB
    async with sessionmaker() as db_sess:
        # Because the client is global the stream on the backend is preserved. 
        async for chunk in stream_resp['body']:
            # also do something with the db_session
            yield chunk


@app.route("/route_here")
async def route_here(
    db_sess: Annotated[AsyncSession, Depends(get_db)]
):
    # br_client is now global, and shared
    stream_resp = br_client.invoke_model_with_response_stream(args="here")
    # do DB stuff here, we can still use the Depends
    return StreamingResponse(
        _stream_resp(
            db_sess=db_sess,
            stream_resp=stream_resp
        )
    )

btemplep avatar May 12 '24 19:05 btemplep

I agree, this is definitely unexpected behavior. I understand the motivation for not wanting to hold resources like db sessions more than needed, but it does make sense in the context of a streaming response.

If we're yielding from a generator and we want to commit when the generator is fully consumed (as part of the generator, calling session.commit() after the generator stop iteration is raised and the response is closed) then it makes sense to keep the session resource available!

Workarounds might include passing the db session to a background task, which beats some of the points of what this change was trying to achieve and isn't great design. Not sure if there are any other ones. Maybe managing the session manually inside the generator, but then we won't be using FastAPI deps where we should be.

RamiAwar avatar Jun 15 '24 16:06 RamiAwar

@tiangolo hello,do you have any ideas?

buptzyf avatar Jul 02 '24 07:07 buptzyf

I encountered an issue with SQLAlchemy sessions not closing properly when using StreamingResponse in FastAPI. The problem was that the session was not being closed, which led to warnings about unclosed connections.

I found that explicitly closing the session by using await session.close() in the generator function resolved the issue. Here’s how I implemented it:

async def get_user(db):
    await db.get(User, User.id==1)
    await db.close()


@app.route("/user-info")
async def user(db: Depends(get_db)):
    return StreamingResponse(
        get_user(db)
    )

I hope this helps others encountering a similar issue!

Best regards,

tosmart01 avatar Aug 05 '24 06:08 tosmart01

I think Depends should accept a flag where you can specify when the dependency should be closed, before or after the response.

Closing before is useful if you want to raise HTTPExceptions from the cleanup step of your dependencies. Not something I've ever done but maybe if I was just getting started with FastAPI and and learning about that feature, I would find it useful.

Closing after is useful with streamsresponses, if you need the dependencies in background tasks, or when you want to perform slowish operations in the cleanup to be able to send the response faster.

truh avatar Aug 06 '24 11:08 truh

I think Depends should accept a flag where you can specify when the dependency should be closed, before or after the response.

Closing before is useful if you want to raise HTTPExceptions from the cleanup step of your dependencies. Not something I've ever done but maybe if I was just getting started with FastAPI and and learning about that feature, I would find it useful.

Closing after is useful with streamsresponses, if you need the dependencies in background tasks, or when you want to perform slowish operations in the cleanup to be able to send the response faster.

and since this was a breaking change, I'd make it "after" by default for backwards compatibility.

I'm personally still locked to .105 and missing security updates cause of this.

RamiAwar avatar Aug 07 '24 02:08 RamiAwar

what about? return StreamingCSVResponse(csv_stream, headers=headers, background=BackgroundTask(session.close)) it worked for me but I'm not sure it's the correct way to go

sherpya avatar Aug 08 '24 18:08 sherpya