starlette
starlette copied to clipboard
RunTimeError: got Future <Future pending> attached to a different loop when using custom loop in sync fixtures when upgrading from 0.14.2 to 0.15.0
Checklist
- [X] The bug is reproducible against the latest release and/or
master. - [X] There are no similar issues or pull requests to fix it yet.
Describe the bug
Upgrading starlette>=0.15.0 breaks current testing strategy. The setup is mocking a nats subscription by actually using the nats server.
The code works with starlette 0.14.2, upgradign to 0.15.0 gives RunTumeError got Future <Future pending> attached to a different loop . When upgrading to starlette 0.16.0 it gives TimeOut errors. I would love to keep tests sync.
To reproduce
requirements.txt
starlette
requests
pytest
asyncio-nats-client
code
from starlette.routing import Route
from starlette.testclient import TestClient
import pytest
from starlette.applications import Starlette
from starlette.responses import PlainTextResponse
import asyncio
from nats.aio.client import Client as NATS
"""
Test work with starlette 0.14.2
Error with starlette 0.15.0: RunTimeError: got Future <Future pending> attached to a different loop
Error with starlette 0.16.0: Nats timeout
The test is that the client code makes a nats request to a mocked nats service over nats itself.
Requirement a running nats server `docker run -d -p 4222:4222 nats:latest`
"""
HOST_NATS = "localhost:4222"
# =======================================================================
# CODE
# =======================================================================
def create_app():
async def index(request):
r = await request.app.state.nc.request("subject1", timeout=1, payload=b"PING")
return PlainTextResponse(content=r.data.decode())
async def setup() -> None:
await app.state.nc.connect(HOST_NATS)
print("Connected to nats")
app = Starlette(debug=True, routes=[Route('/', index)], on_startup=[setup])
app.state.nc: NATS = NATS()
return app
app = create_app()
# =======================================================================
# MOCKS & TESTS
# =======================================================================
class NatsServiceMock:
def __init__(self) -> None:
self.nc: NATS = NATS()
async def lifespan(self) -> None:
await self.nc.connect(HOST_NATS)
await self.nc.subscribe("subject1", cb=self.handle)
async def handle(self, msg):
await self.nc.publish(msg.reply, b"PONG")
def __enter__(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.lifespan())
return self
def __exit__(self, *args) -> None:
pass
@pytest.fixture(scope="session")
def nats_service() -> None:
with NatsServiceMock() as nc:
yield nc
@pytest.fixture(scope="session")
def test_app(nats_service) -> None:
with TestClient(create_app()) as client:
yield client
# =======================================================================
# TESTS
# =======================================================================
def test_index_should_give_a_succesful_response(test_app):
r = test_app.get("/")
assert r.status_code == 200
assert r.text == "PONG"
Run:
pytest <file>
Expected behavior
The test to work.
Actual behavior
Test does not work.
Debugging material
output running with starlette 0.15.0:
try:
# wait until the future completes or the timeout
try:
> await waiter
E RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/anyio/from_thread.py:177> cb=[TaskGroup._spawn.<locals>.task_done() at /home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:622]> got Future <Future pending> attached to a different loop
/usr/lib/python3.8/asyncio/tasks.py:481: RuntimeError
output when running with starlette 0.16.0:
# Wait for the response or give up on timeout.
try:
msg = await asyncio.wait_for(future, timeout, loop=self._loop)
return msg
except asyncio.TimeoutError:
del self._resp_map[token.decode()]
future.cancel()
> raise ErrTimeout
E nats.aio.errors.ErrTimeout: nats: Timeout
/home/sevaho/.local/share/virtualenvs/testje-KTUsWEz0/lib/python3.8/site-packages/nats/aio/client.py:945: ErrTimeout
Environment
- OS: Linux
- Python version: 3.8
- Starlette version: 0.14.2 / 0.15.0 / 0.16.0
Additional context
[!IMPORTANT]
- We're using Polar.sh so you can upvote and help fund this issue.
- We receive the funding once the issue is completed & confirmed by you.
- Thank you in advance for helping prioritize & fund our backlog.
I get the same issue with FastAPI (based on starlette) and mongo motor client. Just downgraded to 14.2 and fixed the problem as well.
I've got similar issue in tests with FastAPI (0.68.2 --> 0.69.0 which means Starlette upgrading from 0.14.2 to 0.15.0) and databases library (based on asyncpg):
...
[stacktrace here]
...
asyncpg/protocol/protocol.pyx:323: in query
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> ???
E asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
asyncpg/protocol/protocol.pyx:707: InterfaceError
With FastAPI 0.68.2 (Starlette 0.14.2) works like a charm.
I think the problem is that TestClient switched to using AnyIO and in that switch now unconditionally creates its own event loop in TestClient.__enter__ via anyio.start_blocking_portal which means you can't run asyncio.get_event_loop() until you create the test client.
In my case it poses a serious problem because within the definition of my app, I instantiate AsyncIOMotorClient and some other things, one of which seems to attach to the current event loop.
Shouldn't this be moved to the anyio repo?
@sevaho Can you test if this version works without the error?
@aminalaee No, I don't think it's related to anyio at all; I believe the problem is related to a change in behavior caused by Starlette's switch to anyio.
Basically, I could be wrong but my initial understanding is:
- Starlette used to do
loop = asyncio.get_event_loop()inTestClientwhich would use a default event loop if one didn't exist or use an existing one if it did - When Starlette changed to AnyIO, TestClient now uses
anyio.start_blocking_portalto get the event loop which means it will always create a new event loop
This means that if you create any objects that bind to the default event loop before calling TestClient.__enter__, you will probably get strange asyncio errors.
Does this sound like an accurate description of the situation? Asking because this is how I understand it, but I'm not 100% sure I understand anyio.start_blocking_portal because from the docs it doesn't explicitly indicate that it creates a new event loop.
@MatthewScholefield another error:
@pytest.fixture(scope="session")
def test_app() -> TestClient:
> with TestClient(create_app()) as client:
test.py:78:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/starlette/testclient.py:509: in __enter__
self.portal = portal = stack.enter_context(
/usr/lib/python3.9/contextlib.py:448: in enter_context
result = _cm_type.__enter__(cm)
/usr/lib/python3.9/contextlib.py:119: in __enter__
return next(self.gen)
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/anyio/from_thread.py:416: in start_blocking_portal
run_future.result()
/usr/lib/python3.9/concurrent/futures/_base.py:438: in result
return self.__get_result()
/usr/lib/python3.9/concurrent/futures/_base.py:390: in __get_result
raise self._exception
/usr/lib/python3.9/concurrent/futures/thread.py:52: in run
result = self.fn(*self.args, **self.kwargs)
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/anyio/_core/_eventloop.py:56: in run
return asynclib.run(func, *args, **backend_options) # type: ignore
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:233: in run
return native_run(wrapper(), debug=debug)
../../.cache/pypoetry/virtualenvs/centurion-X1KYOsH6-py3.9/lib/python3.9/site-packages/nest_asyncio.py:30: in run
loop = asyncio.get_event_loop()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asyncio.unix_events._UnixDefaultEventLoopPolicy object at 0x7f09c55478e0>
def get_event_loop(self):
"""Get the event loop for the current context.
Returns an instance of EventLoop or raises an exception.
"""
if (self._local._loop is None and
not self._local._set_called and
threading.current_thread() is threading.main_thread()):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
> raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
E RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
/usr/lib/python3.9/asyncio/events.py:642: RuntimeError
======================================================================================================================================================================================================= short test summary info =======================================================================================================================================================================================================
ERROR test.py::test_index_should_give_a_succesful_response - RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
Tested with starlette 0.15.0 and 0.16.0 and 0.17.1.
Also faced with this problem. It seems to me that the problem is that the reference to the database is in a separate event loop, which closes before the test receives data.
Described part of my project:
python = "^3.9"
fastapi = "^0.70.1"
uvicorn = "^0.16.0"
environs = "^9.3.5"
odmantic = "^0.3.5"
pytest = "^6.2.5"
httpx = "^0.21.1"
requests = "^2.26.0"
mongomock = "^3.23.0"
main.py
from fastapi import FastAPI
from app.api.api_v1.api import router as api_router
app = FastAPI()
app.include_router(api_router)
apy/api.py
from fastapi import APIRouter
from app.api.api_v1.endpoints.employee import router as user_router
router = APIRouter()
router.include_router(user_router)
api/api_v1/endpoints/employee.py
from fastapi import APIRouter
from app.crud.employee import EmployeeCRUD
router = APIRouter()
@router.get("/employee/count/", response_model=int, tags=["Data"])
async def count_employees():
return await EmployeeCRUD.count_employees()
crud/employee.py
from dataclasses import dataclass
from fastapi import APIRouter
from app.database.database import engine
from app.model.employee import Employee
router = APIRouter()
@dataclass
class EmployeeCRUD:
model = Employee
@classmethod
async def count_employees(cls):
return await engine.count(cls.model)
database/database.py
from motor.motor_asyncio import AsyncIOMotorClient
from odmantic import AIOEngine
from app.core.config import MONGO_DB_URL
client = AsyncIOMotorClient(MONGO_DB_URL)
engine = AIOEngine(motor_client=client)
test
from fastapi.testclient import TestClient
from app.main import app
def test_ping():
client = TestClient(app)
response = client.get('/api/employee/count/')
assert response.status_code == 200
================================================= test session starts ==================================================
platform darwin -- Python 3.10.1, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
rootdir: /Users/aleksan/Code/employees-appย โ ะบะพะฟะธั
plugins: anyio-3.4.0
collected 1 item
tests/test_api.py F [100%]
======================================================= FAILURES =======================================================
______________________________________________________ test_count ______________________________________________________
def test_count():
client = TestClient(app)
> response = client.get('/api/employee/count/')
tests/test_api.py:7:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/requests/sessions.py:555: in get
return self.request('GET', url, **kwargs)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/testclient.py:468: in request
return super().request(
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/requests/sessions.py:542: in request
resp = self.send(prep, **send_kwargs)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/requests/sessions.py:655: in send
r = adapter.send(request, **kwargs)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/testclient.py:266: in send
raise exc
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/testclient.py:263: in send
portal.call(self.app, scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/anyio/from_thread.py:240: in call
return cast(T_Retval, self.start_task_soon(func, *args).result())
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py:445: in result
return self.__get_result()
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py:390: in __get_result
raise self._exception
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/anyio/from_thread.py:187: in _call_func
retval = await retval
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/fastapi/applications.py:208: in __call__
await super().__call__(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/applications.py:112: in __call__
await self.middleware_stack(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/middleware/errors.py:181: in __call__
raise exc
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/middleware/errors.py:159: in __call__
await self.app(scope, receive, _send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/middleware/cors.py:84: in __call__
await self.app(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/exceptions.py:82: in __call__
raise exc
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/exceptions.py:71: in __call__
await self.app(scope, receive, sender)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/routing.py:656: in __call__
await route.handle(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/routing.py:259: in handle
await self.app(scope, receive, send)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/starlette/routing.py:61: in app
response = await func(request)
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/fastapi/routing.py:226: in app
raw_response = await run_endpoint_function(
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/fastapi/routing.py:159: in run_endpoint_function
return await dependant.call(**values)
app/api/api_v1/endpoints/employee.py:24: in count_employees
return await EmployeeCRUD.count_employees()
app/crud/employee.py:28: in count_employees
count = await engine.count(cls.model)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <odmantic.engine.AIOEngine object at 0x110145180>, model = <class 'app.model.employee.Employee'>, queries = ()
query = QueryExpression()
collection = AsyncIOMotorCollection(Collection(Database(MongoClient(host=['mongo:27017'], document_class=dict, tz_aware=False, connect=False, driver=DriverInfo(name='Motor', version='2.3.1', platform='asyncio')), 'test'), 'employee'))
async def count(
self, model: Type[ModelType], *queries: Union[QueryExpression, Dict, bool]
) -> int:
"""Get the count of documents matching a query
Args:
model: model to perform the operation on
queries: query filters to apply
Returns:
number of document matching the query
<!---
#noqa: DAR401 TypeError
-->
"""
if not lenient_issubclass(model, Model):
raise TypeError("Can only call count with a Model class")
query = AIOEngine._build_query(*queries)
collection = self.database[model.__collection__]
> count = await collection.count_documents(query)
E RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /Users/aleksan/Code/test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/anyio/from_thread.py:187> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/aleksan/Code/test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:629]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/futures.py:384]> attached to a different loop
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/odmantic/engine.py:417: RuntimeError
=================================================== warnings summary ===================================================
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/marshmallow/__init__.py:14
/Users/aleksan/Code/test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/marshmallow/__init__.py:14: DeprecationWarning: The distutils package is deprecated and slated for removal in Python 3.12. Use setuptools or check PEP 632 for potential alternatives
from distutils.version import LooseVersion
../test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/motor/frameworks/asyncio/__init__.py:42
/Users/aleksan/Code/test-tasks-from-interviews/employees-app/.venv/lib/python3.10/site-packages/motor/frameworks/asyncio/__init__.py:42: DeprecationWarning: There is no current event loop
return asyncio.get_event_loop()
-- Docs: https://docs.pytest.org/en/stable/warnings.html
=============================================== short test summary info ================================================
FAILED tests/test_api.py::test_count - RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_...
============================================ 1 failed, 2 warnings in 0.26s =============================================
@Hazzari have you tried setting the loop on Motor client?
client = AsyncIOMotorClient()
client.get_io_loop = asyncio.get_event_loop
engine = AIOEngine(motor_client=client)
@Hazzari have you tried setting the loop on Motor client?
client = AsyncIOMotorClient() client.get_io_loop = asyncio.get_event_loop engine = AIOEngine(motor_client=client)
Thank you so much! That solved the problem! Asynchrony is not my strongest suit yet!
I've got similar issue in tests with FastAPI (0.68.2 --> 0.69.0 which means Starlette upgrading from 0.14.2 to 0.15.0) and databases library (based on asyncpg):
... [stacktrace here] ... asyncpg/protocol/protocol.pyx:323: in query ??? _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > ??? E asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress asyncpg/protocol/protocol.pyx:707: InterfaceErrorWith FastAPI 0.68.2 (Starlette 0.14.2) works like a charm.
I have the same issue about this with using SQLAlchemy async connect I change the code from
engine = create_async_engine(TEST_DATABASE_URL, echo=True, future=True)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session
to
async def get_session() -> AsyncSession:
engine = create_async_engine(TEST_DATABASE_URL, echo=True, future=True)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
yield session
to fix the problem, but obviously not a solution
Here's a very minimal reproduction:
import pytest
import asyncio
from starlette.responses import JSONResponse
from starlette.applications import Starlette
from starlette.routing import Route
async def fn(request):
print('in endpoint', id(asyncio.get_running_loop()))
return JSONResponse({})
def make_app():
app = Starlette(routes=[Route("/", endpoint=fn)])
return app
#######################
from starlette.testclient import TestClient
pytestmark = pytest.mark.anyio
@pytest.fixture
def anyio_backend():
return 'asyncio'
@pytest.fixture
async def tclient():
with TestClient(app=make_app()) as c:
yield c
@pytest.mark.anyio
async def test_bug(tclient):
print('in test', id(asyncio.get_running_loop()))
print(tclient.get('/'))
when running this under pytest, you will see two lines printed with event-loop ids. My expectation is that the IDs would be the same, but they are not.
This makes testing impossible for my use-case -- create a Task in a "request context" (when a request comes in, create_task and kick it to the event loop for scheduling); then asyncio.wait_for it during tests, to assert the result of execution.
I'm not using Mongo so for now my only solution was to downgrade and pin fastapi to 0.68.2
How to reproduce with asyncio.Lock and ab
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
lock = asyncio.Lock()
async def homepage(request):
async with lock:
await asyncio.sleep(0.1)
return JSONResponse({'hello': 'world'})
app = Starlette(debug=True, routes=[
Route('/', homepage),
])
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
$ ab -n 100 -c 100 "http://0.0.0.0:8000/"
I am also facing similar issue in my test cases
My solution for async SQLAlchemy + FastApi + Pytest:
in 'main.py'
@app.on_event('startup')
async def startup_event():
await db.init()
await db.create_all()
@app.on_event('shutdown')
async def shutdown_event():
await db.close()
in db file
Base = declarative_base()
class Database:
def __init__(self, url):
self._session = None
self._engine = None
self.url = url
async def create_all(self):
async with self._engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
def __getattr__(self, name) -> AsyncSession:
return getattr(self._session, name)
async def init(self):
# closes connections if a session is created,
# so as not to create repeated connections
if self._session:
await self._session.close()
self._engine = create_async_engine(self.url, future=True)
self._session = sessionmaker(
self._engine,
expire_on_commit=False,
class_=AsyncSession
)()
db: AsyncSession | Database = Database(settings.db_url())
and client object in "test.py"
@pytest.fixture(scope="module")
def client() -> TestClient:
with TestClient(app) as client:
yield client
def test_get_files_without_params(client: TestClient):
response = client.get("/api/v1/file")
I am seeing similar issues, where I have my db_session fixture that ensures unittest changes are rolled back after every test, this requires the app to use the same session to be able to see the data loaded into the database.
With the change to anyio, I am yet to find a combination of things that will allow me to have the fixture db_session on the same loop as the TestClient.
The following worked prior to anyio.
server.py
def build_app():
app = FastAPI()
@app.on_event("startup")
async def app_setup():
app.state.database_pool = await asyncpg.create_pool(
dsn=config.postgres_dsn,
min_size=config.postgres_pool_min_size,
max_size=config.postgres_pool_max_size,
server_settings={
"application_name": "tfx_backend_v{}".format(tfx_backend.__version__),
},
)
@app.on_event("shutdown")
async def app_shutdown():
# cleanly shutdown the connections in the pool
await app.state.database_pool.close()
@app.middleware("http")
async def middleware_asyncpg(request, call_next):
# on request, inject a database transaction
async with request.app.state.database_pool.acquire() as database_connection:
async with database_connection.transaction():
request.state.database_connection = database_connection
return await call_next(request)
return app
tests/conftest.py
@pytest.fixture(scope="session")
def engine(settings):
backend = get_backend(settings.postgres_dsn)
_upgrade(backend)
yield
# Rollback all migrations
_downgrade(backend, count=None)
@pytest.fixture(scope="function", autouse=True)
async def db_session(engine, settings):
conn = await asyncpg.connect(settings.postgres_dsn)
tr = conn.transaction()
await tr.start()
try:
yield conn
except Exception:
pass
finally:
await tr.rollback()
tests/views/conftest.py
import contextlib
class MockPool:
def __init__(self, db_session):
self.db_session = db_session
@contextlib.asynccontextmanager
async def acquire(self):
yield self.db_session
async def close(self):
pass
# We don't want to build the app for every test, create a session level fixture for the app
@pytest.fixture(scope="session")
def app():
return server.build_app()
# We do want to patch the db_session for every test, create a funcion level fixture for the client.
@pytest.fixture
def client(monkeypatch, db_session, app, tmp_path):
mock_pool = MockPool(db_session)
monkeypatch.setattr(server.asyncpg, "create_pool", AsyncMock(return_value=mock_pool))
with TestClient(app=app, raise_server_exceptions=False, base_url="http://localhost") as client:
yield client
This allowed me to populate the database, then make a request on the TestClient and see the same data.
If there was a way to access the loop that TestClient started up, I could create a new db_session for the client as asyncpg will accept a loop parameter, allowing me to run them on the same loop (hopefully). As long as the TestClient doesn't spawn new loops on requests.
If there was a way to access the loop that TestClient started up,
you can use:
@contextlib.asynccontextmanager
async def some_function(app):
async with some_resource() as v:
app.v = v
yield
with TestClient(app=app) as tc, tc.portal.wrap_async_context_manager(some_function(app)):
...
I have a feeling that has worked, so I now get the same error in other fixtures that use the db_session, as its now connected to the TestClient loop.
@contextlib.asynccontextmanager
async def session_factory(app):
conn = await asyncpg.connect(app.state.config.postgres_dsn)
tr = conn.transaction()
await tr.start()
try:
app.state.database_pool = MockPool(conn)
yield
except Exception:
pass
finally:
await tr.rollback()
# We don't want to build the app for every test, create a session level fixture for the app
@pytest.fixture(scope="session")
def app():
return server.build_app()
# We do want to patch the db_session for every test, create a funcion level fixture for the client.
@pytest.fixture
def client(engine, app):
with TestClient(
app=app,
raise_server_exceptions=False,
base_url="http://localhost",
) as client, client.portal.wrap_async_context_manager(session_factory(app)):
yield client
@pytest.fixture
def db_session(client):
return client.app.state.database_pool.db_session
The RuntimeError now raises from the database factory's creating users etc. Rather than from within the view when calling the TestClient.
Seems like I need to influence the loop that TestClient creates (like it used to attach to the test loop).
I have faced the same issues in my unit tests. I fixed the issue by using a context manager for TestClient. I've no idea why it works though (especially because it is not used as context manager in the docs of FastAPI) ๐
Doesn't work:
@pytest.fixture(scope="session")
def client() -> TestClient:
return TestClient(app)
Works:
@pytest.fixture(scope="session")
def client() -> TestClient:
with TestClient(app) as c:
yield c
Any time I am doing async stuff outside of my app I'll just switch to httpx so that I don't have to deal with juggling event looks. I realize it requires rewriting of tests so it may not be a great solution for existing tests that were broken, but it might be a good idea for new tests.
I have faced the same issues in my unit tests. I fixed the issue by using a context manager for
TestClient. I've no idea why it works though (especially because it is not used as context manager in the docs of FastAPI) ๐Doesn't work:
@pytest.fixture(scope="session") def client() -> TestClient: return TestClient(app)Works:
@pytest.fixture(scope="session") def client() -> TestClient: with TestClient(app) as c: yield c
The latter calls the startup event, while the former doesn't.
Same issue. Had to address by pinning Fastapi to 0.68.2
I have faced the same issues in my unit tests. I fixed the issue by using a context manager for
TestClient. I've no idea why it works though (especially because it is not used as context manager in the docs of FastAPI) ๐Doesn't work:
@pytest.fixture(scope="session") def client() -> TestClient: return TestClient(app)Works:
@pytest.fixture(scope="session") def client() -> TestClient: with TestClient(app) as c: yield c
I continue to see the error even after making this change. Could you please share an example of how you're using that client fixture, @ivanychev ?
FastAPI needs context manager support for lifespan
For testing FastAPI with SQLAlchemy, one option is overriding the engine to use a NullPool (https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#using-multiple-asyncio-event-loops).
How to reproduce with
asyncio.Lockandabimport asyncio import uvicorn from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Route lock = asyncio.Lock() async def homepage(request): async with lock: await asyncio.sleep(0.1) return JSONResponse({'hello': 'world'}) app = Starlette(debug=True, routes=[ Route('/', homepage), ]) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)$ ab -n 100 -c 100 "http://0.0.0.0:8000/"
Thanks for this! This helped me figure out that the issue is creating asyncio.Lock or similar objects at the module level (so before the event loop is created). I've created a fix for the redis package: https://github.com/redis/redis-py/pull/2471. I think this will have to be solved at the package level, or users should not define the connections from the offending packages at the module level.
Still actual in 2023, if you encountered with the same issue, here is workaround that helped me:
first: set up asgi-lifespan https://pypi.org/project/asgi-lifespan/
If you have any connection initialization actions in fixtures with session level make sure you override event_loop fixture.
@pytest.fixture(scope="session")
def event_loop():
"""Overrides pytest default function scoped event loop"""
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()
otherwise you will receive ScopeMismatch error.
Startup your test client with LifespanManager:
@pytest.fixture(scope='session')
async def client(app):
"""Async http client for FastAPI application, ASGI init signals handled by
LifespanManager.
"""
async with LifespanManager(app, startup_timeout=100, shutdown_timeout=100):
base_chars = ''.join(random.choices(string.ascii_uppercase, k=4))
async with AsyncClient(app=app, base_url=f"http://{base_chars}") as ac:
yield ac
This comment shows the issue: https://github.com/encode/starlette/issues/1315#issuecomment-1027091444
The event loop ID is different on both prints.