python-dependency-injector
python-dependency-injector copied to clipboard
Single Database Session Per FastAPI Request Lifecycle
Hello, I would like to implement a mechanism that ensures only one database session is created and tied to the FastAPI request lifecycle. The goal is to have a single shared database session across all resources/classes within a request, allowing easy rollback of operations in case of any request-related issues.
Here's the current code example:
import os
from dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
import uvicorn
engine = create_engine("sqlite://")
Base = declarative_base()
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)
class ApplicationContainer(containers.DeclarativeContainer):
wiring_config = containers.WiringConfiguration(modules=[__name__])
database: SessionLocal = providers.Factory(SessionLocal)
app = FastAPI()
@app.get("/")
@inject
async def root(
session_1=Depends(Provide[ApplicationContainer.database]),
session_2=Depends(Provide[ApplicationContainer.database]),
):
return {"session_1_id": id(session_1), "session_2_id": id(session_2)}
container = ApplicationContainer()
if __name__ == "__main__":
uvicorn.run(
os.path.basename(__file__).replace(".py", "") + ":app",
host="127.0.0.1",
port=5000,
log_level="info",
reload=True,
)
Currently, when calling the root endpoint, two separate database sessions are created, resulting in different session IDs:
{
"session_1_id": 4347665504,
"session_2_id": 4347668912
}
However, the desired behavior is to have both arguments (arg and arg2) hold references to the same database session for each request. Therefore, if we call the request again, the ID would change, indicating that a new session was created:
{
"session_1_id": 4347665504,
"session_2_id": 4347665504
}
The ultimate objective is to achieve a single database session per request, which would simplify the rollback process for any issues that might arise during the request.
Thank you for your attention to this matter, and I look forward to your guidance and suggestions.
I am struggling with this exact same thing - because multiple sessions are created, you can end up with deadlocks. Ideally there would be a Request scope in dependency injector for this type of thing.
https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.async_scoped_session
@Trinkes did you find a solution? I have the exact same problem.
@theobouwman not yet. I didn't find the time to investigate the @jess-hwang suggestion.
@Trinkes (https://github.com/ets-labs/python-dependency-injector/issues/760) fixes the one session per request
I made some local testing with locust, and once I start having many requests executing with multi-threads (I'm using sync endpoints), the sessions don't work as expected
@theobouwman It seems it doesn't work as expected when there is more than 1 request being processed.
@Trinkes you are right.
@jess-hwang do you know the solution?
Use async_sessionmaker instead of sessionmaker. Fastapi creates a new async task per request.
async_session_factory = async_sessionmaker(
async_engine,
expire_on_commit=False,
)
async_scoped_session_factory = async_scoped_session(
async_session_factory,
scopefunc=asyncio.current_task,
)
Using scoped_session, you can bind the session to the task. If you call the session factory within the same task(same reques), the same session will be returned.
@jess-hwang I have implemented the code you gave me:
class Database:
def __init__(self, db_url: str) -> None:
self._engine = create_async_engine(
db_url,
echo=get_config().QUERY_ECHO,
echo_pool=get_config().ECHO_POOL,
json_serializer=_custom_json_serializer,
pool_pre_ping=True,
pool_size=get_config().DB_POOL_SIZE,
)
async_session_factory = sessionmaker(
bind=self._engine,
autocommit=False,
autoflush=False,
expire_on_commit=False,
class_= AsyncSession
)
self._async_scoped_session_factory = async_scoped_session(
async_session_factory,
scopefunc=asyncio.current_task,
)
def create_database(self) -> None:
Base.metadata.create_all(self._engine)
@contextmanager
def session(self) -> Callable[..., AbstractContextManager[AsyncSession]]:
session: AsyncSession = self._async_scoped_session_factory()
try:
yield session
except Exception as e:
# logger.exception("Session rollback because of exception")
session.rollback()
raise e
finally:
session.close()
But in my repository still a session is create for each query:
class BaseRepository(Generic[T]):
_model: T # TODO: find out if this is best solution
def __init__(self, session_factory: Callable[..., AbstractContextManager[AsyncSession]]) -> None:
self.session_factory = session_factory
async def get_by_id(self, id: str) -> T:
with self.session_factory() as session:
r = await session.execute(select(self._model).filter(self._model.id == id))
return r.scalar_one_or_none()
This is how I create the dependency injector:
class Container(containers.DeclarativeContainer):
wiring_config = containers.WiringConfiguration(packages=[
"api.routes",
"tasks.routes",
"common.observability"
])
config = providers.Configuration()
db = providers.Singleton(Database, db_url=get_config().DB_URL())
event_repository = providers.Factory(
EventRepository, session_factory=db.provided.session)
So I dont understand what I am doing wrong? Should the with self.session_factory() as session: not reuse the already created session?
@theobouwman I think you should use async_session instead of session.
@asynccontextmanager
async def async_session(self) -> Callable[..., AbstractContextManager[AsyncSession]]:
session = self.async_session_factory()
try:
yield session
except Exception as e:
await session.rollback()
raise
finally:
await session.close()
@jess-hwang it is still creating 2 sessions when i implement your code and when I call
async def get_by_id(self, id: str) -> T:
async with self.session_factory() as session:
r = await session.execute(select(self._model).filter(self._model.id == id))
return r.scalar_one_or_none()
@jess-hwang the session only gets created once the get_by_id function is called in the BaseRepository. How could I create a session which is reused throughout the request and loaded with the dependency injector?
And this is how I inject the services:
@router.get('/event/{event_id}')
@inject
async def testt(event_id: str, event_service: EventService = Depends(Provide[Container.event_service])):
event1 = await event_service.get_event(event_id)
event2 = await event_service.get_event(event_id)
return BaseResponse[List[GetEventResponse]](data=[event1, event2])
same problem here, any solution please?
Here is some pseudocode on how to make this work...
class MyContainer(containers.DeclarativeContainer):
db_session_provider = providers.Factory(async_session_factory)
db_session = providers.ContextLocalSingleton(provides=AsyncSession)
class DbSessionMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app: ASGIApp,
db_session_provider: Provider[MyContainer.db_session_provider],
dispatch: DispatchFunction | None = None,
) -> None:
super().__init__(app, dispatch)
self._db_session_provider = db_session_provider
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
self._db_session_provider()
return await call_next(request)
fastapi_app.add_middleware(DbSessionMiddleware)
The trick here is that you need to initialize the database session on the FastAPI request context -- Which is what we accomplish by initializing the context local singleton from the middleware.
@philipbjorge do you have a fully working example of this?
Same problem. I did not understand how to implement the solution.
Shouldn't this be a feature of python-dependency-injector? This is called scoped lifecycle of container instances. This library should provide something like providers.Scoped(...) and build an extension on top for fastapi providers.FastApiRequestScoped(...).