pytest-asyncio
pytest-asyncio copied to clipboard
Abort async test if event loop catched unhandled exception
Hi. Sometimes async tests hangs when background tasks raises some exception but it's not propagated to test function, it's not a bug in pytest-asyncio but a problem in application code. Can the following patch be used to abort current test if some exception is raised in background?
import inspect as _inspect
import asyncio as _asyncio
from functools import wraps as _wraps
from pytest_asyncio import plugin as _pytestasyncio_plugin
def _wrap_in_sync(func, _loop):
"""Return a sync wrapper around an async function executing it in the
current event loop."""
@_wraps(func)
def inner(**kwargs):
coro = func(**kwargs)
if coro is not None:
def forward_exception(loop, context):
if _inspect.getcoroutinestate(coro) != _inspect.CORO_CLOSED:
coro.throw(context['exception'] if 'exception' in context else Exception(context['message']))
else:
loop.default_exception_handler(context)
_loop.set_exception_handler(forward_exception)
task = _asyncio.ensure_future(coro, loop=_loop)
try:
_loop.run_until_complete(task)
except BaseException:
# run_until_complete doesn't get the result from exceptions
# that are not subclasses of `Exception`. Consume all
# exceptions to prevent asyncio's warning from logging.
if task.done() and not task.cancelled():
task.exception()
raise
return inner
_pytestasyncio_plugin.wrap_in_sync = _wrap_in_sync