python-dependency-injector icon indicating copy to clipboard operation
python-dependency-injector copied to clipboard

Single Database Session Per FastAPI Request Lifecycle

Open Trinkes opened this issue 2 years ago • 47 comments

Hello, I would like to implement a mechanism that ensures only one database session is created and tied to the FastAPI request lifecycle. The goal is to have a single shared database session across all resources/classes within a request, allowing easy rollback of operations in case of any request-related issues.

Here's the current code example:

import os

from dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
import uvicorn

engine = create_engine("sqlite://")

Base = declarative_base()
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)


class ApplicationContainer(containers.DeclarativeContainer):
    wiring_config = containers.WiringConfiguration(modules=[__name__])
    database: SessionLocal = providers.Factory(SessionLocal)


app = FastAPI()


@app.get("/")
@inject
async def root(
    session_1=Depends(Provide[ApplicationContainer.database]),
    session_2=Depends(Provide[ApplicationContainer.database]),
):
    return {"session_1_id": id(session_1), "session_2_id": id(session_2)}


container = ApplicationContainer()

if __name__ == "__main__":
    uvicorn.run(
        os.path.basename(__file__).replace(".py", "") + ":app",
        host="127.0.0.1",
        port=5000,
        log_level="info",
        reload=True,
    )

Currently, when calling the root endpoint, two separate database sessions are created, resulting in different session IDs:

{
  "session_1_id": 4347665504,
  "session_2_id": 4347668912
}

However, the desired behavior is to have both arguments (arg and arg2) hold references to the same database session for each request. Therefore, if we call the request again, the ID would change, indicating that a new session was created:

{
  "session_1_id": 4347665504,
  "session_2_id": 4347665504
}

The ultimate objective is to achieve a single database session per request, which would simplify the rollback process for any issues that might arise during the request.

Thank you for your attention to this matter, and I look forward to your guidance and suggestions.

Trinkes avatar Jul 27 '23 16:07 Trinkes

I am struggling with this exact same thing - because multiple sessions are created, you can end up with deadlocks. Ideally there would be a Request scope in dependency injector for this type of thing.

dandiep avatar Aug 08 '23 02:08 dandiep

https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.async_scoped_session

jess-hwang avatar Sep 03 '23 11:09 jess-hwang

@Trinkes did you find a solution? I have the exact same problem.

theobouwman avatar Nov 08 '23 18:11 theobouwman

@theobouwman not yet. I didn't find the time to investigate the @jess-hwang suggestion.

Trinkes avatar Nov 08 '23 18:11 Trinkes

@Trinkes (https://github.com/ets-labs/python-dependency-injector/issues/760) fixes the one session per request

theobouwman avatar Nov 08 '23 18:11 theobouwman

I made some local testing with locust, and once I start having many requests executing with multi-threads (I'm using sync endpoints), the sessions don't work as expected

@theobouwman It seems it doesn't work as expected when there is more than 1 request being processed.

Trinkes avatar Nov 08 '23 18:11 Trinkes

@Trinkes you are right.

theobouwman avatar Nov 08 '23 19:11 theobouwman

@jess-hwang do you know the solution?

theobouwman avatar Nov 08 '23 19:11 theobouwman

Use async_sessionmaker instead of sessionmaker. Fastapi creates a new async task per request.

async_session_factory = async_sessionmaker(
    async_engine,
    expire_on_commit=False,
)
async_scoped_session_factory = async_scoped_session(
    async_session_factory,
    scopefunc=asyncio.current_task,
)

Using scoped_session, you can bind the session to the task. If you call the session factory within the same task(same reques), the same session will be returned.

jess-hwang avatar Nov 09 '23 15:11 jess-hwang

@jess-hwang I have implemented the code you gave me:

class Database:

    def __init__(self, db_url: str) -> None:
        self._engine = create_async_engine(
            db_url,
            echo=get_config().QUERY_ECHO,
            echo_pool=get_config().ECHO_POOL,
            json_serializer=_custom_json_serializer,
            pool_pre_ping=True,
            pool_size=get_config().DB_POOL_SIZE,
        )
        async_session_factory = sessionmaker(
            bind=self._engine, 
            autocommit=False,
            autoflush=False,
            expire_on_commit=False,
            class_= AsyncSession
        )
        self._async_scoped_session_factory = async_scoped_session(
            async_session_factory,
            scopefunc=asyncio.current_task,
        )

    def create_database(self) -> None:
        Base.metadata.create_all(self._engine)

    @contextmanager
    def session(self) -> Callable[..., AbstractContextManager[AsyncSession]]:
        session: AsyncSession = self._async_scoped_session_factory()
        try:
            yield session
        except Exception as e:
            # logger.exception("Session rollback because of exception")
            session.rollback()
            raise e
        finally:
            session.close()

But in my repository still a session is create for each query:

class BaseRepository(Generic[T]):
    _model: T # TODO: find out if this is best solution

    def __init__(self, session_factory: Callable[..., AbstractContextManager[AsyncSession]]) -> None:
        self.session_factory = session_factory

    async def get_by_id(self, id: str) -> T:
        with self.session_factory() as session:
            r = await session.execute(select(self._model).filter(self._model.id == id))
            return r.scalar_one_or_none()

This is how I create the dependency injector:

class Container(containers.DeclarativeContainer):

    wiring_config = containers.WiringConfiguration(packages=[
        "api.routes",
        "tasks.routes",
        "common.observability"
    ])

    config = providers.Configuration()
 db = providers.Singleton(Database, db_url=get_config().DB_URL())

    event_repository = providers.Factory(
        EventRepository, session_factory=db.provided.session)

So I dont understand what I am doing wrong? Should the with self.session_factory() as session: not reuse the already created session?

theobouwman avatar Nov 10 '23 12:11 theobouwman

@theobouwman I think you should use async_session instead of session.

@asynccontextmanager
async def async_session(self) -> Callable[..., AbstractContextManager[AsyncSession]]:
    session = self.async_session_factory()
    try:
        yield session
    except Exception as e:
        await session.rollback()
        raise
    finally:
        await session.close()

jess-hwang avatar Nov 10 '23 12:11 jess-hwang

@jess-hwang it is still creating 2 sessions when i implement your code and when I call

 async def get_by_id(self, id: str) -> T:
        async with self.session_factory() as session:
            r = await session.execute(select(self._model).filter(self._model.id == id))
            return r.scalar_one_or_none()

theobouwman avatar Nov 10 '23 12:11 theobouwman

@jess-hwang the session only gets created once the get_by_id function is called in the BaseRepository. How could I create a session which is reused throughout the request and loaded with the dependency injector?

theobouwman avatar Nov 10 '23 13:11 theobouwman

And this is how I inject the services:


@router.get('/event/{event_id}')
@inject
async def testt(event_id: str, event_service: EventService = Depends(Provide[Container.event_service])):
    event1 = await event_service.get_event(event_id)
    event2 = await event_service.get_event(event_id)
    return BaseResponse[List[GetEventResponse]](data=[event1, event2])

theobouwman avatar Nov 10 '23 13:11 theobouwman

same problem here, any solution please?

ebrahimradi avatar Dec 27 '23 08:12 ebrahimradi

Here is some pseudocode on how to make this work...

class MyContainer(containers.DeclarativeContainer):
  db_session_provider = providers.Factory(async_session_factory)
  db_session = providers.ContextLocalSingleton(provides=AsyncSession)
class DbSessionMiddleware(BaseHTTPMiddleware):
    def __init__(
        self,
        app: ASGIApp,
        db_session_provider: Provider[MyContainer.db_session_provider],
        dispatch: DispatchFunction | None = None,
    ) -> None:
        super().__init__(app, dispatch)
        self._db_session_provider = db_session_provider

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        self._db_session_provider()
        return await call_next(request)
fastapi_app.add_middleware(DbSessionMiddleware)

The trick here is that you need to initialize the database session on the FastAPI request context -- Which is what we accomplish by initializing the context local singleton from the middleware.

philipbjorge avatar Jan 03 '24 00:01 philipbjorge

@philipbjorge do you have a fully working example of this?

theobouwman avatar Feb 09 '24 15:02 theobouwman

Same problem. I did not understand how to implement the solution.

Shouldn't this be a feature of python-dependency-injector? This is called scoped lifecycle of container instances. This library should provide something like providers.Scoped(...) and build an extension on top for fastapi providers.FastApiRequestScoped(...).

JobaDiniz avatar Jul 22 '24 14:07 JobaDiniz