Issue in sync.py's SyncToAsync class as new ThreadPoolExecutor executors with daemon threads getting created for requests.
Hi,
I'm attempting to run an async view using the ASGI protocol with Daphne as the server. However, I've noticed that it's creating new ThreadPoolExecutor instances for some requests, with daemon threads still running in the background post-benchmarking. My understanding is that since it's based on an event loop, it should use a single ThreadPoolExecutor. Could someone clarify this for me?
Middlewares:
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
And my view is:
import threading
import asyncio
import aiohttp
import logging
import psutil as psutil
from rest_framework.response import Response
from adrf.decorators import api_view
import json
logger = logging.getLogger(__name__)
@api_view(['GET'])
async def index(request):
res = await make_api_request("http://{{host}}/v1/completions")
return Response(res, status=200)
async def make_api_request(url, method="POST", headers=None, params=None, json_data=None, timeout=None):
try:
json_data = {
'prompt': 'Hi, How are you?',
'max_new_tokens': 700, 'temperature': 0, 'top_p': 1, 'max_tokens': 700,
'model': 'meta-llama/Llama-2-7b-chat-hf'}
async with aiohttp.ClientSession() as session:
async with session.request(method, url, headers=headers, params=params, json=json_data,
timeout=timeout, ssl=False) as response:
content = await response.read()
if 'json' in response.headers.get('Content-Type', ''):
content = json.loads(content)
return content
except asyncio.TimeoutError:
raise TimeoutError("Request timed out. The server did not respond within the specified timeout period.")
except aiohttp.ClientError as e:
raise ConnectionError(f"Request error: {str(e)}")
except Exception as e:
raise Exception(f"Exception error: {str(e)}")
The code I'm pointing to in sync.py
async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
__traceback_hide__ = True # noqa: F841
loop = asyncio.get_running_loop()
# Work out what thread to run the code in
if self._thread_sensitive:
if hasattr(AsyncToSync.executors, "current"):
# If we have a parent sync thread above somewhere, use that
executor = AsyncToSync.executors.current
elif self.thread_sensitive_context.get(None):
# If we have a way of retrieving the current context, attempt
# to use a per-context thread pool executor
thread_sensitive_context = self.thread_sensitive_context.get()
if thread_sensitive_context in self.context_to_thread_executor:
# Re-use thread executor in current context
executor = self.context_to_thread_executor[thread_sensitive_context]
else:
# Create new thread executor in current context
executor = ThreadPoolExecutor(max_workers=1)
# print("================== created new thread ================")
self.context_to_thread_executor[thread_sensitive_context] = executor
elif loop in AsyncToSync.loop_thread_executors:
# Re-use thread executor for running loop
executor = AsyncToSync.loop_thread_executors[loop]
elif self.deadlock_context.get(False):
raise RuntimeError(
"Single thread executor already being used, would deadlock"
)
else:
# Otherwise, we run it in a fixed single thread
executor = self.single_thread_executor
self.deadlock_context.set(True)
else:
# Use the passed in executor, or the loop's default if it is None
executor = self._executor
context = contextvars.copy_context()
child = functools.partial(self.func, *args, **kwargs)
func = context.run
try:
# Run the code in the right thread
ret: _R = await loop.run_in_executor(
executor,
functools.partial(
self.thread_handler,
loop,
self.get_current_task(),
sys.exc_info(),
func,
child,
),
)
finally:
_restore_context(context)
self.deadlock_context.set(False)
return ret
Actually, it's normal for multiple executors to be created. The specific behaviour should be:
- If the call is marked as
thread-sensitive, there's a single shared global executor that is used - If the call is not marked as that, then the default behaviour of loop.run_in_executor is used, which I believe is to spawn a new one each time.
- You can optionally override this by passing your own
executor=argument into SyncToAsync, but this is rarely done
- You can optionally override this by passing your own
@andrewgodwin Thanks for your comment. I'm using Django as a framework, and each call is explicitly marked as thread_sensitive=True using the default ASGI handler class. For instance:
await sync_to_async(signals.request_started.send, thread_sensitive=True)(
sender=self.__class__, scope=scope
)
Considering this, ideally, a single shared global executor should be used. However, despite this setup, the code seems to be entering the else block, where a new thread executor is created in the current context instead of reusing the existing one:
if thread_sensitive_context in self.context_to_thread_executor:
# Re-use thread executor in current context
executor = self.context_to_thread_executor[thread_sensitive_context]
else:
# Create new thread executor in current context
executor = ThreadPoolExecutor(max_workers=1)
self.context_to_thread_executor[thread_sensitive_context] = executor
Since all sorts of things that you can do in your app can affect that (different middleware, server configuration, loading in something that uses gevent, etc.), we can only really debug it if you can produce a reliable way to reproduce it in a tiny sample app, unfortunately.
I understand. I'm actually working with a simple app containing just a view. Here are the steps I've followed:
- Created a virtual environment and installed necessary packages (django, djangorestframework, adrf, aiohttp, daphne, asyncio).
- Added daphne and adrf to INSTALLED_APPS in settings.py.
- Created an async view in views.py and added a corresponding path in urls.py.
- Run the server with
python manage.py runserver(can see the logStarting ASGI/Daphne version 4.1.0 development server at http://127.0.0.1:8000/) - Utilized a benchmarking tool (in this case, ab) to test the API endpoint (ab -c 10 -n 600 -s 800007 -T application/json "http://127.0.0.1:8000/test").
- Monitored the total number of threads created during benchmarking using
top -o threads -pid <pid>.
I'm working on Mac M1 Air