uvicorn icon indicating copy to clipboard operation
uvicorn copied to clipboard

🐞 Bug Report — FlowControl.drain() hangs forever after client disconnect, causing leaked tasks and memory growth

Open mascotli opened this issue 2 months ago • 0 comments

Initial Checks

  • [x] I confirm this was discussed, and the maintainers suggest I open an issue.
  • [x] I'm aware that if I created this issue without a discussion, it may be closed without a response.

Discussion Link

Possible FlowControl.drain() hang after client disconnect on Python 3.13 (memory leak)

Description

When a client disconnects during a streaming HTTP response (e.g. StreamingResponse or SSE), Uvicorn sometimes never releases the coroutine waiting in await self.flow.drain() inside httptools_impl.RequestResponseCycle.send().

As a result, the request task remains alive even though the connection is closed, and memory/async tasks accumulate over time.

Example Code

# =====================================================
# ✅ 最底层业务生成器:inner_gen_v1
# =====================================================
async def inner_gen_v1(n: int):
    """
    模拟底层生成器。支持 TaskGroup 取消和异常清理。
    """
    logger.info("[INIT] inner_gen created")
    try:
        for i in range(n):
            # await asyncio.sleep(0.2)
            yield f"inner-{i}"
            # 模拟大块数据
            yield f"inner-{b'a' * 1024 * 1024}"
    except GeneratorExit:
        logger.info("[CLEANUP] inner_gen released in generator exit")
    except (anyio.get_cancelled_exc_class(), asyncio.CancelledError):
        # 这里不是必须,但可以加日志方便追踪取消
        logger.info("[CANCELLED] inner_gen cancelled")
        raise
    except Exception as e:
        logger.error(f"[ERROR] inner_gen error: {e}")
        raise
    finally:
        gc.collect()
        logger.info("[CLEANUP] inner_gen released")


# =====================================================
# ✅ 中间层生成器:middle_gen_v1
# =====================================================
async def middle_gen_v1(n: int):
    """
    中间层包装 inner_gen。
    无论被取消还是正常结束,都确保 inner_gen 被关闭。
    """
    logger.info("[INIT] middle_gen created")
    agen = inner_gen_v1(n)
    try:
        async for chunk in agen:
            yield f"middle: {chunk}"
            # if 1 == 1:  # 模拟提前结束
            #     logger.debug("[INFO] middle_gen break early")
            #     break
            # await asyncio.sleep(20)
    except GeneratorExit:
        logger.info("[CLEANUP] middle_gen released in generator exit")
    except (anyio.get_cancelled_exc_class(), asyncio.CancelledError):
        logger.info("[CANCELLED] middle_gen cancelled")
        raise
    except Exception as e:
        logger.error(f"[ERROR] middle_gen error: {e}")
        raise
    finally:
        await agen.aclose()
        logger.info("[CLEANUP] middle_gen released")
    #
    # for _ in range(100):
    #     logger.debug("[INFO] middle_gen break early")
    #     await asyncio.sleep(3)
    #     # time.sleep(30)



# =====================================================
# 后台生产协程 (Producer)
# =====================================================
async def producer(send_stream: anyio.abc.ObjectSendStream, n: int, cancel_event: anyio.Event):
    """
    从业务生成器取数据 -> 放入内存队列
    可响应 cancel_event 停止
    """
    async with send_stream:
        logger.info("[START] producer started")
        start_time = time.perf_counter()
        agen = middle_gen_v1(n)
        try:
            async for data in agen:
                # 如果客户端已断开或 cancel_event 触发,则停止
                if cancel_event.is_set():
                    logger.info("[INFO] cancel_event triggered, stopping producer")
                    break

                try:
                    await send_stream.send({"event": "data", "data": data})
                except anyio.EndOfStream:
                    logger.info("[INFO] Receiver closed — stop producing.")
                    break

                # logger.info("[INFO] producer start sleep 20s")
                # await asyncio.sleep(20)
                # logger.info("[INFO] producer end sleep 20s")
            else:
                await send_stream.send({"event": "end", "data": "done"})
        except (asyncio.CancelledError, anyio.get_cancelled_exc_class()):
            logger.info("[CANCELLED] Producer cancelled (TaskGroup or client)")
            raise
        except (anyio.BrokenResourceError, anyio.EndOfStream):
            # Client disconnected - perform cleanup
            logger.info("[CLEANUP] Client disconnected & send stream closed - perform cleanup")
            raise
        except Exception as e:
            logger.exception("[ERROR] Producer failed")
            try:
                await send_stream.send({"event": "error", "data": str(e)})
            except anyio.EndOfStream:
                pass
        finally:
            await agen.aclose()
            logger.info(f"[CLEANUP] producer finished cost {time.perf_counter() - start_time:.2f} seconds")


# =====================================================
# SSE 事件消费生成器 (Consumer)
# =====================================================
async def event_stream(
    request: Request, receive_stream: anyio.abc.ObjectReceiveStream, cancel_event: anyio.Event
):
    """
    从内存队列读取数据并以 SSE 事件形式 yield
    当客户端断开时,触发 cancel_event 并结束流
    """
    logger.debug("[START] event stream start")
    start_time = time.perf_counter()
    try:
        async with receive_stream:
            async for msg in receive_stream:
                # 检查客户端是否断开连接
                if await request.is_disconnected():
                    logger.info("[INFO] Client disconnected — cancel signal sent")
                    cancel_event.set()
                    break
                yield msg
                # logger.info("[INFO] consumer start sleep 20s")
                # await asyncio.sleep(20)
                # logger.info("[INFO] consumer end sleep 20s")
    except (asyncio.CancelledError, anyio.get_cancelled_exc_class()):
        # Client disconnected - perform cleanup
        logger.info("[CLEANUP] Client disconnected - perform cleanup")
        raise
    except (anyio.BrokenResourceError, anyio.EndOfStream):
        # Client disconnected - perform cleanup
        logger.info("[CLEANUP] Client disconnected & receive stream closed - perform cleanup")
        raise
    finally:
        cancel_event.set()
        logger.info(f"[CLEANUP] event_stream released cost {time.perf_counter() - start_time:.2f} seconds")


# =====================================================
# FastAPI Endpoint
# =====================================================
@event_router.get("/a/sse")
async def sse_endpoint(request: Request):
    """
    完整安全版本:
    - 创建内存队列
    - 启动后台任务生产数据
    - 前台流式输出
    - 自动清理资源
    """
    cancel_event = anyio.Event()
    send_stream, receive_stream = anyio.create_memory_object_stream()

    async def _cleaner():
        logger.info("[CLEANUP] SSE fully closing")
        if not cancel_event.is_set():
            cancel_event.set()
        # await cancel_event.wait()
        # await asyncio.sleep(3)
        logger.info("[CLEANUP] SSE fully notification start")
        await cancel_event.wait()
        logger.info("[CLEANUP] SSE fully notification end")
        await send_stream.aclose()
        await receive_stream.aclose()
        logger.info("[CLEANUP] SSE fully closed")

    async def _client_close_handler(message):
        logger.info(f"[CLEANUP] client closing with {message}")
        cancel_event.set()
        logger.info(f"[CLEANUP] client closed with {message}")

    return EventSourceResponse(
        event_stream(request, receive_stream, cancel_event),
        data_sender_callable=partial(producer, send_stream, 10, cancel_event),
        ping=5,
        send_timeout=3,
        background=BackgroundTask(
            _cleaner,
        ),  # 手动加入后台清理任务
        client_close_handler_callable=_client_close_handler,
    )







# test  case
import asyncio
from concurrent.futures import ThreadPoolExecutor

import anyio
import asyncer
import httpx
from httpx_sse import aconnect_sse

API_URL = "http://localhost:8000/event/a/sse"
API_KEY = "sk-your-key"


async def stream_agent_response():
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Accept": "text/event-stream",
        "Content-Type": "application/json",
    }

    payload = {
        "input": "请帮我总结以下内容",
        "options": {"stream": True},
    }

    async with httpx.AsyncClient() as client:
        async with aconnect_sse(
            client,
            method="GET",
            url=API_URL,
            headers=headers,
            json=payload,
            timeout=10,
        ) as event_source:
            async for sse in event_source.aiter_sse():
                # 每个 sse 对象包含 event, data, id 等字段
                if sse.event == "message":
                    print("[DATA]", sse.event)
                elif sse.event == "error":
                    print("[ERROR]", sse.data)
                elif sse.event == "done":
                    print("[DONE]")
                    break
                else:
                    print(f"[{sse.event}]")


async def main(times=50, workers=30):
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = []
        for i in range(times):
            futures.append(
                executor.submit(
                    asyncer.runnify(stream_agent_response),
                )
            )

        print("Submitted all tasks")

        # 等待所有任务执行完成并获取结果
        for future in futures:
            result = future.result()  # 阻塞直到每个任务完成
            print(f"Task completed with result: {result}")

    print("All tasks completed.")


if __name__ == "__main__":
    asyncio.run(main(times=100, workers=100))

Python, Uvicorn & OS Version

python 3.13
uvicorn 0.38
uvloop 0.21.0
starlette 0.48.0
sse_starlette 3.0.3
fastapi 0.121.0

mascotli avatar Nov 09 '25 12:11 mascotli