uvloop icon indicating copy to clipboard operation
uvloop copied to clipboard

no default __reduce__ due to non-trivial __cinit__

Open Jan-Jasek opened this issue 2 years ago • 0 comments

  • uvloop version: 0.17.0
  • Python version: 3.11
  • Platform: Linux - python:3.11-slim-buster docker image on ubuntu VM
  • Can you reproduce the bug with PYTHONASYNCIODEBUG in env?: Yes
  • Does uvloop behave differently from vanilla asyncio? How?: Unknown. Uvicorn installs uvloop by default, so we would need to switch the web server.

Summary

  • We are running fastapi web app with uvicorn web server and uvloop as the event loop. For CPU heavy handlers with asynchronous dependencies (in this case, we have ETL handlers that depends on AsyncOpenSearch client), we submit asynchronous tasks to ProcessPoolExecutor to distribute the load over multiple CPU cores. The approach worked fine for classic non async handlers. For async ones, the runtime eventually starts raising exceptions on subsequent asynchronous handlers submitted. Upon runtime start, everyting work fine for several hours. After some time (or perhaps some trigger I failed to find), the handlers stop working completely, always raising the same exception. Restarting the server helps for several hours before issues start to reappear.

How to reproduce

  • Unknown, probably happens every time the server gets idle for unknown period of time. Switching multiprocessing context between fork / spawn has not changed the outcome.

Example Traceback

2023-05-18T10:50:36 INFO:    172.18.0.1:33876 - "POST /upload_data/ HTTP/1.1" 500
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 276, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/sentry_sdk/integrations/starlette.py", line 335, in _sentry_patched_asgi_app
    return await middleware(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py", line 139, in _run_asgi3
    return await self._run_app(scope, lambda: self.app(scope, receive, send))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py", line 188, in _run_app
    raise exc from None
  File "/usr/local/lib/python3.11/site-packages/sentry_sdk/integrations/asgi.py", line 183, in _run_app
    return await callback()
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
               ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 237, in app
    raw_response = await run_endpoint_function(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 163, in run_endpoint_function
    return await dependant.call(**values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app_code/services/mobile_service/api/loading_endpoints.py", line 94, in upload_diary_endpoint
    logging.info(result.result())
                 ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "stringsource", line 2, in uvloop.loop.Loop.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

Code snippet

ProcessPoolExecutor wrapper

def run_and_block_coroutine(func: Callable, *args, **kwargs):
    return asyncio.run(func, *args, **kwargs)

class ProcessPoolOrchestrator:
    _worker_pool: ProcessPoolExecutor

    def __init__(self, worker_count: int = multiprocessing.cpu_count() * 2):
        self._worker_pool = ProcessPoolExecutor(
            max_workers=worker_count,
            mp_context=multiprocessing.get_context("spawn"),
        )

    def submit_task_async(self, task: Coroutine, *args, **kwargs) -> Future:
        new_args = (task, *args)
        return self._worker_pool.submit(run_and_block_coroutine, *new_args, **kwargs)

Example fastapi endpoint

app = FastAPI()
worker_orchestrator = ProcessPoolOrchestrator()

@app.post("/upload_data/")
async def upload_data_endpoint(
    input_data: int,
    async_client: AsyncOpenSearch = Depends(AsyncOpenSearch),
    use_case: AsyncLoaderUseCase = Depends(AsyncLoaderUseCase),
):
    result = worker_orchestrator.submit_task_async(
        use_case.execute,
        async_client,
        input_data,
    )
    logging.info(result.result()) # calling result.result() raises the internal error that was raised in the process pool
    return "Successfully uploaded."

Example use case / handler

class AsyncLoaderUseCase:
    async def execute(
        self,
        async_client: AsyncOpenSearch
        input_data: int
    ):
         domain_data = input_data**5 # just to simulate some cpu heavy call
         return await async_client.index(json.dumps(domain_data))

Any idea what I am dealing with here would be appreciated. 🙏

Jan-Jasek avatar May 19 '23 20:05 Jan-Jasek