uvloop
uvloop copied to clipboard
no default __reduce__ due to non-trivial __cinit__
- uvloop version: 0.17.0
- Python version: 3.11
- Platform: Linux - python:3.11-slim-buster docker image on ubuntu VM
-
Can you reproduce the bug with
PYTHONASYNCIODEBUGin env?: Yes - Does uvloop behave differently from vanilla asyncio? How?: Unknown. Uvicorn installs uvloop by default, so we would need to switch the web server.
Summary
- We are running fastapi web app with uvicorn web server and uvloop as the event loop. For CPU heavy handlers with asynchronous dependencies (in this case, we have ETL handlers that depends on AsyncOpenSearch client), we submit asynchronous tasks to ProcessPoolExecutor to distribute the load over multiple CPU cores. The approach worked fine for classic non async handlers. For async ones, the runtime eventually starts raising exceptions on subsequent asynchronous handlers submitted. Upon runtime start, everyting work fine for several hours. After some time (or perhaps some trigger I failed to find), the handlers stop working completely, always raising the same exception. Restarting the server helps for several hours before issues start to reappear.
How to reproduce
- Unknown, probably happens every time the server gets idle for unknown period of time. Switching multiprocessing context between fork / spawn has not changed the outcome.
Example Traceback
2023-05-18T10:50:36 INFO: 172.18.0.1:33876 - "POST /upload_data/ HTTP/1.1" 500
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 276, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py", line 335, in _sentry_patched_asgi_app
return await middleware(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py", line 139, in _run_asgi3
return await self._run_app(scope, lambda: self.app(scope, receive, send))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py", line 188, in _run_app
raise exc from None
File "/usr/local/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py", line 183, in _run_app
return await callback()
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 84, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 66, in app
response = await func(request)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 237, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 163, in run_endpoint_function
return await dependant.call(**values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app_code/services/mobile_service/api/loading_endpoints.py", line 94, in upload_diary_endpoint
logging.info(result.result())
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "stringsource", line 2, in uvloop.loop.Loop.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
Code snippet
ProcessPoolExecutor wrapper
def run_and_block_coroutine(func: Callable, *args, **kwargs):
return asyncio.run(func, *args, **kwargs)
class ProcessPoolOrchestrator:
_worker_pool: ProcessPoolExecutor
def __init__(self, worker_count: int = multiprocessing.cpu_count() * 2):
self._worker_pool = ProcessPoolExecutor(
max_workers=worker_count,
mp_context=multiprocessing.get_context("spawn"),
)
def submit_task_async(self, task: Coroutine, *args, **kwargs) -> Future:
new_args = (task, *args)
return self._worker_pool.submit(run_and_block_coroutine, *new_args, **kwargs)
Example fastapi endpoint
app = FastAPI()
worker_orchestrator = ProcessPoolOrchestrator()
@app.post("/upload_data/")
async def upload_data_endpoint(
input_data: int,
async_client: AsyncOpenSearch = Depends(AsyncOpenSearch),
use_case: AsyncLoaderUseCase = Depends(AsyncLoaderUseCase),
):
result = worker_orchestrator.submit_task_async(
use_case.execute,
async_client,
input_data,
)
logging.info(result.result()) # calling result.result() raises the internal error that was raised in the process pool
return "Successfully uploaded."
Example use case / handler
class AsyncLoaderUseCase:
async def execute(
self,
async_client: AsyncOpenSearch
input_data: int
):
domain_data = input_data**5 # just to simulate some cpu heavy call
return await async_client.index(json.dumps(domain_data))
Any idea what I am dealing with here would be appreciated. 🙏