uvicorn
uvicorn copied to clipboard
🐞 Bug Report — FlowControl.drain() hangs forever after client disconnect, causing leaked tasks and memory growth
Initial Checks
- [x] I confirm this was discussed, and the maintainers suggest I open an issue.
- [x] I'm aware that if I created this issue without a discussion, it may be closed without a response.
Discussion Link
Possible FlowControl.drain() hang after client disconnect on Python 3.13 (memory leak)
Description
When a client disconnects during a streaming HTTP response (e.g. StreamingResponse or SSE), Uvicorn sometimes never releases the coroutine waiting in await self.flow.drain() inside httptools_impl.RequestResponseCycle.send().
As a result, the request task remains alive even though the connection is closed, and memory/async tasks accumulate over time.
Example Code
# =====================================================
# ✅ 最底层业务生成器:inner_gen_v1
# =====================================================
async def inner_gen_v1(n: int):
"""
模拟底层生成器。支持 TaskGroup 取消和异常清理。
"""
logger.info("[INIT] inner_gen created")
try:
for i in range(n):
# await asyncio.sleep(0.2)
yield f"inner-{i}"
# 模拟大块数据
yield f"inner-{b'a' * 1024 * 1024}"
except GeneratorExit:
logger.info("[CLEANUP] inner_gen released in generator exit")
except (anyio.get_cancelled_exc_class(), asyncio.CancelledError):
# 这里不是必须,但可以加日志方便追踪取消
logger.info("[CANCELLED] inner_gen cancelled")
raise
except Exception as e:
logger.error(f"[ERROR] inner_gen error: {e}")
raise
finally:
gc.collect()
logger.info("[CLEANUP] inner_gen released")
# =====================================================
# ✅ 中间层生成器:middle_gen_v1
# =====================================================
async def middle_gen_v1(n: int):
"""
中间层包装 inner_gen。
无论被取消还是正常结束,都确保 inner_gen 被关闭。
"""
logger.info("[INIT] middle_gen created")
agen = inner_gen_v1(n)
try:
async for chunk in agen:
yield f"middle: {chunk}"
# if 1 == 1: # 模拟提前结束
# logger.debug("[INFO] middle_gen break early")
# break
# await asyncio.sleep(20)
except GeneratorExit:
logger.info("[CLEANUP] middle_gen released in generator exit")
except (anyio.get_cancelled_exc_class(), asyncio.CancelledError):
logger.info("[CANCELLED] middle_gen cancelled")
raise
except Exception as e:
logger.error(f"[ERROR] middle_gen error: {e}")
raise
finally:
await agen.aclose()
logger.info("[CLEANUP] middle_gen released")
#
# for _ in range(100):
# logger.debug("[INFO] middle_gen break early")
# await asyncio.sleep(3)
# # time.sleep(30)
# =====================================================
# 后台生产协程 (Producer)
# =====================================================
async def producer(send_stream: anyio.abc.ObjectSendStream, n: int, cancel_event: anyio.Event):
"""
从业务生成器取数据 -> 放入内存队列
可响应 cancel_event 停止
"""
async with send_stream:
logger.info("[START] producer started")
start_time = time.perf_counter()
agen = middle_gen_v1(n)
try:
async for data in agen:
# 如果客户端已断开或 cancel_event 触发,则停止
if cancel_event.is_set():
logger.info("[INFO] cancel_event triggered, stopping producer")
break
try:
await send_stream.send({"event": "data", "data": data})
except anyio.EndOfStream:
logger.info("[INFO] Receiver closed — stop producing.")
break
# logger.info("[INFO] producer start sleep 20s")
# await asyncio.sleep(20)
# logger.info("[INFO] producer end sleep 20s")
else:
await send_stream.send({"event": "end", "data": "done"})
except (asyncio.CancelledError, anyio.get_cancelled_exc_class()):
logger.info("[CANCELLED] Producer cancelled (TaskGroup or client)")
raise
except (anyio.BrokenResourceError, anyio.EndOfStream):
# Client disconnected - perform cleanup
logger.info("[CLEANUP] Client disconnected & send stream closed - perform cleanup")
raise
except Exception as e:
logger.exception("[ERROR] Producer failed")
try:
await send_stream.send({"event": "error", "data": str(e)})
except anyio.EndOfStream:
pass
finally:
await agen.aclose()
logger.info(f"[CLEANUP] producer finished cost {time.perf_counter() - start_time:.2f} seconds")
# =====================================================
# SSE 事件消费生成器 (Consumer)
# =====================================================
async def event_stream(
request: Request, receive_stream: anyio.abc.ObjectReceiveStream, cancel_event: anyio.Event
):
"""
从内存队列读取数据并以 SSE 事件形式 yield
当客户端断开时,触发 cancel_event 并结束流
"""
logger.debug("[START] event stream start")
start_time = time.perf_counter()
try:
async with receive_stream:
async for msg in receive_stream:
# 检查客户端是否断开连接
if await request.is_disconnected():
logger.info("[INFO] Client disconnected — cancel signal sent")
cancel_event.set()
break
yield msg
# logger.info("[INFO] consumer start sleep 20s")
# await asyncio.sleep(20)
# logger.info("[INFO] consumer end sleep 20s")
except (asyncio.CancelledError, anyio.get_cancelled_exc_class()):
# Client disconnected - perform cleanup
logger.info("[CLEANUP] Client disconnected - perform cleanup")
raise
except (anyio.BrokenResourceError, anyio.EndOfStream):
# Client disconnected - perform cleanup
logger.info("[CLEANUP] Client disconnected & receive stream closed - perform cleanup")
raise
finally:
cancel_event.set()
logger.info(f"[CLEANUP] event_stream released cost {time.perf_counter() - start_time:.2f} seconds")
# =====================================================
# FastAPI Endpoint
# =====================================================
@event_router.get("/a/sse")
async def sse_endpoint(request: Request):
"""
完整安全版本:
- 创建内存队列
- 启动后台任务生产数据
- 前台流式输出
- 自动清理资源
"""
cancel_event = anyio.Event()
send_stream, receive_stream = anyio.create_memory_object_stream()
async def _cleaner():
logger.info("[CLEANUP] SSE fully closing")
if not cancel_event.is_set():
cancel_event.set()
# await cancel_event.wait()
# await asyncio.sleep(3)
logger.info("[CLEANUP] SSE fully notification start")
await cancel_event.wait()
logger.info("[CLEANUP] SSE fully notification end")
await send_stream.aclose()
await receive_stream.aclose()
logger.info("[CLEANUP] SSE fully closed")
async def _client_close_handler(message):
logger.info(f"[CLEANUP] client closing with {message}")
cancel_event.set()
logger.info(f"[CLEANUP] client closed with {message}")
return EventSourceResponse(
event_stream(request, receive_stream, cancel_event),
data_sender_callable=partial(producer, send_stream, 10, cancel_event),
ping=5,
send_timeout=3,
background=BackgroundTask(
_cleaner,
), # 手动加入后台清理任务
client_close_handler_callable=_client_close_handler,
)
# test case
import asyncio
from concurrent.futures import ThreadPoolExecutor
import anyio
import asyncer
import httpx
from httpx_sse import aconnect_sse
API_URL = "http://localhost:8000/event/a/sse"
API_KEY = "sk-your-key"
async def stream_agent_response():
headers = {
"Authorization": f"Bearer {API_KEY}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
}
payload = {
"input": "请帮我总结以下内容",
"options": {"stream": True},
}
async with httpx.AsyncClient() as client:
async with aconnect_sse(
client,
method="GET",
url=API_URL,
headers=headers,
json=payload,
timeout=10,
) as event_source:
async for sse in event_source.aiter_sse():
# 每个 sse 对象包含 event, data, id 等字段
if sse.event == "message":
print("[DATA]", sse.event)
elif sse.event == "error":
print("[ERROR]", sse.data)
elif sse.event == "done":
print("[DONE]")
break
else:
print(f"[{sse.event}]")
async def main(times=50, workers=30):
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for i in range(times):
futures.append(
executor.submit(
asyncer.runnify(stream_agent_response),
)
)
print("Submitted all tasks")
# 等待所有任务执行完成并获取结果
for future in futures:
result = future.result() # 阻塞直到每个任务完成
print(f"Task completed with result: {result}")
print("All tasks completed.")
if __name__ == "__main__":
asyncio.run(main(times=100, workers=100))
Python, Uvicorn & OS Version
python 3.13
uvicorn 0.38
uvloop 0.21.0
starlette 0.48.0
sse_starlette 3.0.3
fastapi 0.121.0