`asyncio.to_thread` could be a lot more efficient
Feature or enhancement
Proposal:
It seems like there is a lot of layers of overhead involved in asyncio.to_thread: extracting a concurrent.futures.Future via submission to the ThreadPoolExecutor, creating a corresponding asyncio.Future, and then synchronizing these two via callbacks, including translating the exceptions from concurrent-speak to asyncio-speak etc. Also involved in this process is a lot of (unnecessary-in-this-case) locking within the concurrent.futures.Future.
Why not simply have the ThreadPoolExecutor update the asyncio.Future directly? That would eliminate nearly all these layers of pointless overhead, skipping the concurrent.futures.Future altogether.
Here's a small proof of concept (including a couple hacks that could be smoothed over) to demonstrate how easily this could be achieved (even allows you to rip out the functools.partial as a bonus, since ThreadPoolExecutor allows kwargs submissions):
from asyncio import get_running_loop, AbstractEventLoop, Future
from asyncio.futures import _set_result_unless_cancelled
from concurrent.futures import thread
from typing import Callable
async def to_thread_new_and_improved[**P, T](fn: Callable[P, T], /, *args: P.args, **kwargs: P.kwargs) -> T:
loop = get_running_loop()
executor = loop._default_executor
if not executor:
# one-time hack to get the loop's default executor
loop.run_in_executor(None, int)
executor = loop._default_executor
# Now copy-and-paste (mostly) from ThreadPoolExecutor.submit:
with executor._shutdown_lock, thread._global_shutdown_lock:
if executor._broken or executor._shutdown or thread._shutdown:
raise TheRelevantException
# The key change:
f = loop.create_future() # instead of f = concurrent.futures.Future()
executor._work_queue.put(thread._WorkItem(FakeFuture(loop, f), fn, args, kwargs))
executor._adjust_thread_count()
del fn, args, kwargs, loop, executor # don't know if all this is necessary but never hurts
return await f
# where "FakeFuture" is defined as follows
# (simply delegating to an asyncio.Future while "pretending" to be a concurrent.futures.Future):
class FakeFuture[T]:
def __init__(self, loop: AbstractEventLoop, fut: Future[T]):
self._loop = loop
self._fut = fut
def set_running_or_notify_cancel(self):
return not (self._loop.is_closed() or self._fut.cancelled())
def set_result(self, result: T):
self._callback(_set_result_unless_cancelled, self._fut, result)
def set_exception(self, exception: BaseException):
self._callback(_set_exception_unless_cancelled, self._fut, exception)
def cancel(self):
self._callback(self._fut.cancel)
def _callback(self, f, *args):
try:
self._loop.call_soon_threadsafe(f, *args)
except RuntimeError:
pass
Note: I did not handle contextvars here, but that would be a straightforward tweak; also, _set_exception_unless_cancelled doesn't exist but you can infer how that is written from _set_result_unless_cancelled.
Has this already been discussed elsewhere?
This is a minor feature, which does not need previous discussion elsewhere
Links to previous discussion of this feature:
No response
asyncio.to_thread is used to run blocking or CPU bound task which takes long to complete, and is done to avoid blocking the event loop. I doubt that this overhead is noticeable against the actual work done by thread.
Do you have performance numbers for this?
Performance improvements are welcome! If you (or someone else reading this) are interested in getting this into CPython 3.15, you should create a PR implementing your proposed approach that passes the existing test cases, and create benchmarks showing the performance improvement.
(Edit: I didn't see Kumar's comment before writing this.)
Regarding benchmarking: so far, I've only been able to figure out how to benchmark one half of this equation, which is the submission to the ThreadPoolExecutor and obtaining the initial asyncio.Future. But there is also the other half which is: c.f.thread._WorkItem signals c.f.Future, c.f.Future signals asyncio.Future with translated results (or in the "new and improved" case, c.f.thread._WorkItem signals asyncio.Future directly).
But the first half is still quite useful to test, and I do see a 2x performance gain with new_and_improved for that half:
from asyncio import to_thread
async def test_timing():
from timeit import timeit
niters = 10000000
time_new = timeit('to_thread_new_and_improved(int).send(None)', globals=globals(), number=niters)
time_old = timeit('to_thread(int).send(None)', globals=globals(), number=niters)
print(f'time to_thread_new_and_improved: {time_new}')
print(f'time asyncio.to_thread: {time_old}')
if __name__ == '__main__':
import asyncio
asyncio.run(test_timing())
time to_thread_new_and_improved: 14.950462501496077 time asyncio.to_thread: 32.94412062317133
Side note: there was the mention that this is usually used for long running tasks, and therefore the timing doesn't matter since it is short in comparison to the task execution. I disagree, because (1) that's not always the case (e.g. quick file IO operations) and (2) the timing of the actual task is offloaded to a different thread. So how long it takes on the main thread to do this offloading and callback-ing is certainly relevant even if the main thread's execution time is a small fraction of the time on a separate thread. For webservers etc. it is important to keep the main thread zipping along.
Side note #2: there is also something to be said for simplicity in-and-of-itself. Currently in order to understand how to_thread works in the first place, you have to sift through multiple layers of callback hell. What I'm proposing would greatly simplify this implementation by entirely bypassing one full layer of bidirectional callbacks. So not only would this proposal improve code performance, it would also save reading & comprehension time & effort for developers... that is valuable too!
Hi @HansBrende and everyone,
This is an excellent proposal. Inspired by the discussion, I've implemented the change to measure the performance impact concretely.
To address the discussion about whether this overhead is noticeable, I benchmarked against realistic, short-lived blocking tasks where this matters most. To ensure the reliability of the results, I ran a large sample of 100,000 tasks for each scenario.
Below is the exact code used for the test runs.
import asyncio
import time
import os
import tempfile
import pathlib
# --- Functions to be tested in threads ---
def blocking_io_task(path):
"""Simulates a fast, blocking I/O operation."""
try:
return os.stat(path).st_size
except OSError:
return -1
def blocking_cpu_task(n):
"""Simulates CPU work with an argument and a return value."""
return sum(range(n))
# --- Benchmark Runner ---
async def run_benchmark(description, task_func, args_list, num_tasks):
print(f"--- {description} ---")
await asyncio.to_thread(task_func, *args_list[0]) # Warm-up
start_time = time.perf_counter()
results = await asyncio.gather(*(asyncio.to_thread(task_func, *args) for args in args_list))
duration = time.perf_counter() - start_time
assert len(results) == num_tasks
print(f"Ran {num_tasks} tasks in: {duration:.4f} seconds")
print(f"Average overhead per call: {duration / num_tasks * 1e6:.2f} microseconds\n")
async def main():
NUM_TASKS = 100_000
print(f"Starting large-scale benchmark for 'asyncio.to_thread' (Tasks: {NUM_TASKS})...")
with tempfile.NamedTemporaryFile() as tmp:
await run_benchmark("Scenario 1: Fast I/O (os.stat)", blocking_io_task, [(tmp.name,)] * NUM_TASKS, NUM_TASKS)
await run_benchmark("Scenario 2: CPU Task (sum)", blocking_cpu_task, [(500,)] * NUM_TASKS, NUM_TASKS)
if __name__ == "__main__":
asyncio.run(main())
Benchmark Results (100,000 iterations)
The results from this large-scale test strongly support the proposal.
Scenario 1: High-Concurrency Fast File I/O (os.stat)
| Version | Average Cost per Call | Performance Uplift |
|---|---|---|
| Before (Original) | 82.25 µs | (Baseline) |
| After (New Impl.) | 48.35 µs | ~41.2% |
Scenario 2: High-Concurrency CPU Task (with data transfer)
| Version | Average Cost per Call | Performance Uplift |
|---|---|---|
| Before (Original) | 76.25 µs | (Baseline) |
| After (New Impl.) | 42.38 µs | ~44.4% |
Conclusion
The data shows a consistent and significant 41-44% performance improvement. This confirms that reducing main-thread overhead for to_thread is highly beneficial, especially for high-throughput applications.
The full test_asyncio suite passes with this change, ensuring that the optimization is safe and introduces no regressions.
I am preparing a Pull Request with the implementation. Thanks again for the great suggestion
One idea that would simplify the implementation would be (1) to add a submit_async method to the Executor interface (identical to submit except simply returning a loop.create_future() instead of c.f.Future()) and (2) to modify the loop interface to add a get_default_executor method (to remove the annoying hack I did).
E.g.:
class ThreadPoolExecutor: # Same could also be done for ProcessPoolExecutor just as easily!
def submit_async(loop: AbstractEventLoop, fn, /, *args, **kwargs) -> asyncio.Future:
# ... copy-and-paste from `submit` ...
f = loop.create_future()
w = _WorkItem(FakeFuture(loop, f), fn, args, kwargs)
# ... copy-and-paste from rest of `submit`
# if we wanted to be clever, we could actually avoid a new method altogether by doing something like:
class ThreadPoolExecutor:
def submit(fn, /, *args, **kwargs):
loop = None
if isinstance(fn, AbstractEventLoop):
loop = fn
fn, *args = args
...
f = loop.create_future() if loop else _base.Future()
# etc.
class BaseEventLoop:
def get_default_executor():
return self._default_executor or (self._default_executor := ...)
If both of those ideas were implemented, the resulting to_thread implementation would become extraordinarily simple:
async def to_thread(func, /, *args, *kwargs):
loop = get_running_loop()
executor = loop.get_default_executor()
return await executor.submit_async(loop, contextvars.copy_context().run, func, *args, **kwargs)
Of course, this idea would require modifying the public apis for executor and/or the event loop, so I'm not sure if that's cool or would require some kind of lengthy "official" proposal or some-such.
一个可以简化实现的想法是(1)
submit_async向接口添加一个方法Executor(与submit简单地返回一个loop.create_future()而不是 cfFuture() 相同)和(2)修改循环接口以添加一个get_default_executor方法(以删除我所做的烦人的黑客攻击)。例如:
class ThreadPoolExecutor: # Same could also be done for ProcessPoolExecutor just as easily!
def submit_async(loop: AbstractEventLoop, fn, /, *args, **kwargs) -> asyncio.Future: # ... copy-and-paste fromsubmit... f = loop.create_future() w = _WorkItem(FakeFuture(loop, f), fn, args, kwargs) # ... copy-and-paste from rest ofsubmitif we wanted to be clever, we could actually avoid a new method altogether by doing something like:
class ThreadPoolExecutor: def submit(fn, /, *args, **kwargs): loop = None if isinstance(fn, AbstractEventLoop): loop = fn fn, *args = args ... f = loop.create_future() if loop else _base.Future() # etc.
class BaseEventLoop: def get_default_executor(): return self._default_executor or (self._default_executor := ...) 如果这两个想法都得到实施,最终的
to_thread实现将变得_非常_简单:async def to_thread(func, /, *args, *kwargs): loop = get_running_loop() executor = loop.get_default_executor() return await executor.submit_async(loop, contextvars.copy_context().run, func, *args, **kwargs) 当然,这个想法需要修改执行器和/或事件循环的公共 API,所以我不确定这是否很酷或者是否需要某种冗长的“官方”提案或类似的东西。
Good idea,but maybe a breaking change.This is an excellent idea for a cleaner API, though I'm mindful that changing the public Executor interface is a significant step that would likely require a broader discussion.cc @serhiy-storchaka
One other alternative idea (last one, promise), if it turned out to be acceptable to make the Executor interface slightly more generic, would be to add the following method, which would encapsulate the ability to do both things simultaneously. If we look at Executor from a more abstract perspective, it really just needs a callback with 4 methods defined to work correctly. We could give it a lot more power by allowing one to pass in this callback explicitly instead of it always choosing c.f.Future in such an opinionated fashion.
class WriteOnlyFuture(Protocol):
def set_running_or_notify_cancel(self) -> bool:
...
def set_result(self, result):
...
def set_exception(self, exception: BaseException):
...
def cancel(self):
...
class ThreadPoolExecutor: # would also work for ProcessPoolExecutor
def send_to_future(future: WriteOnlyFuture, fn, /, *args, **kwargs)
...
That being said, if it is too much pain to add to the public API, my original implementation works great without changing any public-facing items; it's just not quite as clean. Perhaps @JelleZijlstra would have some insight into these pros and cons?