starlette
starlette copied to clipboard
Avoid unexpected background task cancellation
The starting point for contributions should usually be a discussion
Simple documentation typos may be raised as stand-alone pull requests, but otherwise please ensure you've discussed your proposal prior to issuing a pull request.
This will help us direct work appropriately, and ensure that any suggested changes have been okayed by the maintainers.
- [x] Initially raised as discussion #1438
Unfortunately, this is not a solution. 😞
More info on why this is not a solution: https://github.com/encode/starlette/pull/1441
Okay, this can be resolved by adding these lines:
t = anyio.get_current_task()
if t.name == "anyio.from_thread.BlockingPortal._call_func":
# cancel stuck task due to discarded response
# see: https://github.com/encode/starlette/issues/1022
task_group.cancel_scope.cancel()
@Kludex Can you review this approach?
Okay, this can be resolved by adding these lines:
t = anyio.get_current_task() if t.name == "anyio.from_thread.BlockingPortal._call_func": # cancel stuck task due to discarded response # see: https://github.com/encode/starlette/issues/1022 task_group.cancel_scope.cancel()
This issue has 4 years. A bit of explanation on what you're doing here would be cool...
After tracing the tasks under CancelScope, I finally found that task. Probably due to silenced anyio's WouldBlock exception. I'll provide more details tomorrow
Conclusion
To sum up, to solve the problem of #1022, the anyio
integration in #1157 introduced task_group.cancel_scope.cancel()
to cancel stuck tasks. However, if there are still other background tasks, they will also be cancelled.
Then the approach is simple: just cancel the stuck one, which is anyio.from_thread.BlockingPortal._call_func
Postmortem
If we scrutinize the whole stack and trace which task got stuck, the coro
is pretty suspicious.
async def call_next(request: Request) -> Response:
app_exc: typing.Optional[Exception] = None
send_stream, recv_stream = anyio.create_memory_object_stream()
async def coro() -> None:
nonlocal app_exc
async with send_stream:
try:
await self.app(scope, request.receive, send_stream.send)
except Exception as exc:
app_exc = exc
task_group.start_soon(coro)
Let's dig into the send_stream.send
. We can find self.send_nowait
raises WouldBlock
and it'll be caught underneath except
, then it never returns from await send_event.wait()
.
# from anyio/streams/memory.py
async def send(self, item: T_Item) -> None:
await checkpoint()
try:
self.send_nowait(item)
except WouldBlock:
# Wait until there's someone on the receiving end
send_event = Event()
self._state.waiting_senders[send_event] = item
try:
await send_event.wait()
except BaseException:
self._state.waiting_senders.pop(send_event, None) # type: ignore[arg-type]
raise
if self._state.waiting_senders.pop(send_event, None): # type: ignore[arg-type]
raise BrokenResourceError
We got stuck with asyncio
, so let's hit the asyncio
backends of anyio
:
class Event(BaseEvent):
def __new__(cls) -> "Event":
return object.__new__(cls)
def __init__(self) -> None:
self._event = asyncio.Event()
def set(self) -> DeprecatedAwaitable:
self._event.set()
return DeprecatedAwaitable(self.set)
def is_set(self) -> bool:
return self._event.is_set()
async def wait(self) -> None:
if await self._event.wait():
await checkpoint()
From the documentation of asyncio.Event.wait
:
Block until the internal flag is true.
If the internal flag is true on entry, return True immediately. Otherwise, block until another coroutine calls set() to set the flag to true, then return True.
Solution
Let's recap the send_stream.send
above, if there're no waiters, send_stream.send
will block. Why there're no waiters? Because call_next
(inherently BaseHTTPMiddleware.__call__
) was called before and memory stream was created by anyio.create_memory_object_stream()
but StreamingResponse
is not consumed!
In order to resolve this problem, you either consume it manually:
class CustomMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
resp = await call_next(request)
async def _send(m):
pass
await resp.stream_response(_send)
return PlainTextResponse("Custom")
Or just ignore the call_next
:
class CustomMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
return PlainTextResponse("Custom")
~Or just do the same in my PR to cancel the tangled "producer awaiting sending but no consumer receiving" task.~
===UPDATE=== The cancellation still cancels some lingering tasks. Need to ponder.
Okay, just to shield the background tasks:
diff --git a/starlette/background.py b/starlette/background.py
index 4aaf7ae..db9b38a 100644
--- a/starlette/background.py
+++ b/starlette/background.py
@@ -1,6 +1,8 @@
import sys
import typing
+import anyio
+
if sys.version_info >= (3, 10): # pragma: no cover
from typing import ParamSpec
else: # pragma: no cover
@@ -22,10 +24,11 @@ class BackgroundTask:
self.is_async = is_async_callable(func)
async def __call__(self) -> None:
- if self.is_async:
- await self.func(*self.args, **self.kwargs)
- else:
- await run_in_threadpool(self.func, *self.args, **self.kwargs)
+ with anyio.CancelScope(shield=True):
+ if self.is_async:
+ await self.func(*self.args, **self.kwargs)
+ else:
+ await run_in_threadpool(self.func, *self.args, **self.kwargs)
But then only shielding the bkg tasks will be enough... Like https://github.com/encode/starlette/pull/1654
There's no need to call cancel
every time. If it's starlette.middleware.base.BaseHTTPMiddleware.__call__.<locals>.call_next.<locals>.coro
, it's okay to ignore
As the reporter on #1438, I would say two things about this PR:
- I would be quite unhappy with a solution that relied on checking the current task name - having a magic string seems to me very hacky and fragile;
- I think that the location of the cancellation shield is wrong - e.g. if you use the
BackgroundTasks
object to make multiple tasks, when the cancellation is triggered, thefor
loop inBackgroundTasks.__call__
will be interrupted - I believe that the cancellation shield should go in theResponse
class;
The background tasks are shielded in Response
, StreamingResponse
and FileResponse
now
Hope to see a more permanent solution to this eventually.
For anyone struggling with this and using asyncio...
import asyncio
async def doit():
await asyncio.sleep(5)
async def main(request):
await asyncio.shield(doit())
return PlainTextResponse('Hello')
The only downside is of the user closes the window before the process completes, you will get a RuntimeError: No response returned.
If you need this for a sync function, just use run_in_threadpool
Looks like Trio-esque Task Groups and Exception Groups are built into asyncio
for python 3.11: https://realpython.com/python311-exception-groups/