cpython icon indicating copy to clipboard operation
cpython copied to clipboard

`asyncio.to_thread` could be a lot more efficient

Open HansBrende opened this issue 6 months ago • 1 comments

Feature or enhancement

Proposal:

It seems like there is a lot of layers of overhead involved in asyncio.to_thread: extracting a concurrent.futures.Future via submission to the ThreadPoolExecutor, creating a corresponding asyncio.Future, and then synchronizing these two via callbacks, including translating the exceptions from concurrent-speak to asyncio-speak etc. Also involved in this process is a lot of (unnecessary-in-this-case) locking within the concurrent.futures.Future.

Why not simply have the ThreadPoolExecutor update the asyncio.Future directly? That would eliminate nearly all these layers of pointless overhead, skipping the concurrent.futures.Future altogether.

Here's a small proof of concept (including a couple hacks that could be smoothed over) to demonstrate how easily this could be achieved (even allows you to rip out the functools.partial as a bonus, since ThreadPoolExecutor allows kwargs submissions):

from asyncio import get_running_loop, AbstractEventLoop, Future
from asyncio.futures import _set_result_unless_cancelled
from concurrent.futures import thread
from typing import Callable

async def to_thread_new_and_improved[**P, T](fn: Callable[P, T], /, *args: P.args, **kwargs: P.kwargs) -> T:
    loop = get_running_loop()
    executor = loop._default_executor
    if not executor:
        # one-time hack to get the loop's default executor
        loop.run_in_executor(None, int)
        executor = loop._default_executor

    # Now copy-and-paste (mostly) from ThreadPoolExecutor.submit:
    with executor._shutdown_lock, thread._global_shutdown_lock:
          if executor._broken or executor._shutdown or thread._shutdown:
               raise TheRelevantException
          
          # The key change:
          f = loop.create_future()    # instead of f = concurrent.futures.Future()

          executor._work_queue.put(thread._WorkItem(FakeFuture(loop, f), fn, args, kwargs))
          executor._adjust_thread_count()
    
     del fn, args, kwargs, loop, executor  # don't know if all this is necessary but never hurts
     return await f


# where "FakeFuture" is defined as follows 
# (simply delegating to an asyncio.Future while "pretending" to be a concurrent.futures.Future):
class FakeFuture[T]:
    def __init__(self, loop: AbstractEventLoop, fut: Future[T]):
        self._loop = loop
        self._fut = fut

    def set_running_or_notify_cancel(self):
        return not (self._loop.is_closed() or self._fut.cancelled())

    def set_result(self, result: T):
        self._callback(_set_result_unless_cancelled, self._fut, result)

    def set_exception(self, exception: BaseException):
        self._callback(_set_exception_unless_cancelled, self._fut, exception)

    def cancel(self):
        self._callback(self._fut.cancel)

    def _callback(self, f, *args):
        try:
            self._loop.call_soon_threadsafe(f, *args)
        except RuntimeError:
            pass

Note: I did not handle contextvars here, but that would be a straightforward tweak; also, _set_exception_unless_cancelled doesn't exist but you can infer how that is written from _set_result_unless_cancelled.

Has this already been discussed elsewhere?

This is a minor feature, which does not need previous discussion elsewhere

Links to previous discussion of this feature:

No response

HansBrende avatar Jun 29 '25 03:06 HansBrende

asyncio.to_thread is used to run blocking or CPU bound task which takes long to complete, and is done to avoid blocking the event loop. I doubt that this overhead is noticeable against the actual work done by thread.

Do you have performance numbers for this?

kumaraditya303 avatar Jun 29 '25 11:06 kumaraditya303

Performance improvements are welcome! If you (or someone else reading this) are interested in getting this into CPython 3.15, you should create a PR implementing your proposed approach that passes the existing test cases, and create benchmarks showing the performance improvement.

(Edit: I didn't see Kumar's comment before writing this.)

JelleZijlstra avatar Jun 29 '25 14:06 JelleZijlstra

Regarding benchmarking: so far, I've only been able to figure out how to benchmark one half of this equation, which is the submission to the ThreadPoolExecutor and obtaining the initial asyncio.Future. But there is also the other half which is: c.f.thread._WorkItem signals c.f.Future, c.f.Future signals asyncio.Future with translated results (or in the "new and improved" case, c.f.thread._WorkItem signals asyncio.Future directly).

But the first half is still quite useful to test, and I do see a 2x performance gain with new_and_improved for that half:

from asyncio import to_thread

async def test_timing():
    from timeit import timeit

    niters = 10000000
    time_new = timeit('to_thread_new_and_improved(int).send(None)', globals=globals(), number=niters)
    time_old = timeit('to_thread(int).send(None)', globals=globals(), number=niters)
    print(f'time to_thread_new_and_improved: {time_new}')
    print(f'time asyncio.to_thread: {time_old}')


if __name__ == '__main__':
    import asyncio
    asyncio.run(test_timing())

time to_thread_new_and_improved: 14.950462501496077 time asyncio.to_thread: 32.94412062317133

Side note: there was the mention that this is usually used for long running tasks, and therefore the timing doesn't matter since it is short in comparison to the task execution. I disagree, because (1) that's not always the case (e.g. quick file IO operations) and (2) the timing of the actual task is offloaded to a different thread. So how long it takes on the main thread to do this offloading and callback-ing is certainly relevant even if the main thread's execution time is a small fraction of the time on a separate thread. For webservers etc. it is important to keep the main thread zipping along.

Side note #2: there is also something to be said for simplicity in-and-of-itself. Currently in order to understand how to_thread works in the first place, you have to sift through multiple layers of callback hell. What I'm proposing would greatly simplify this implementation by entirely bypassing one full layer of bidirectional callbacks. So not only would this proposal improve code performance, it would also save reading & comprehension time & effort for developers... that is valuable too!

HansBrende avatar Jun 29 '25 18:06 HansBrende

Hi @HansBrende and everyone,

This is an excellent proposal. Inspired by the discussion, I've implemented the change to measure the performance impact concretely.

To address the discussion about whether this overhead is noticeable, I benchmarked against realistic, short-lived blocking tasks where this matters most. To ensure the reliability of the results, I ran a large sample of 100,000 tasks for each scenario.

Below is the exact code used for the test runs.

import asyncio
import time
import os
import tempfile
import pathlib

# --- Functions to be tested in threads ---
def blocking_io_task(path):
    """Simulates a fast, blocking I/O operation."""
    try:
        return os.stat(path).st_size
    except OSError:
        return -1

def blocking_cpu_task(n):
    """Simulates CPU work with an argument and a return value."""
    return sum(range(n))

# --- Benchmark Runner ---
async def run_benchmark(description, task_func, args_list, num_tasks):
    print(f"--- {description} ---")
    await asyncio.to_thread(task_func, *args_list[0]) # Warm-up
    start_time = time.perf_counter()
    results = await asyncio.gather(*(asyncio.to_thread(task_func, *args) for args in args_list))
    duration = time.perf_counter() - start_time
    assert len(results) == num_tasks
    print(f"Ran {num_tasks} tasks in: {duration:.4f} seconds")
    print(f"Average overhead per call: {duration / num_tasks * 1e6:.2f} microseconds\n")

async def main():
    NUM_TASKS = 100_000
    print(f"Starting large-scale benchmark for 'asyncio.to_thread' (Tasks: {NUM_TASKS})...")
    with tempfile.NamedTemporaryFile() as tmp:
        await run_benchmark("Scenario 1: Fast I/O (os.stat)", blocking_io_task, [(tmp.name,)] * NUM_TASKS, NUM_TASKS)
    await run_benchmark("Scenario 2: CPU Task (sum)", blocking_cpu_task, [(500,)] * NUM_TASKS, NUM_TASKS)

if __name__ == "__main__":
    asyncio.run(main())

Benchmark Results (100,000 iterations)

The results from this large-scale test strongly support the proposal.

Scenario 1: High-Concurrency Fast File I/O (os.stat)

Version Average Cost per Call Performance Uplift
Before (Original) 82.25 µs (Baseline)
After (New Impl.) 48.35 µs ~41.2%

Scenario 2: High-Concurrency CPU Task (with data transfer)

Version Average Cost per Call Performance Uplift
Before (Original) 76.25 µs (Baseline)
After (New Impl.) 42.38 µs ~44.4%

Conclusion

The data shows a consistent and significant 41-44% performance improvement. This confirms that reducing main-thread overhead for to_thread is highly beneficial, especially for high-throughput applications.

The full test_asyncio suite passes with this change, ensuring that the optimization is safe and introduces no regressions.

I am preparing a Pull Request with the implementation. Thanks again for the great suggestion

heliang666s avatar Jun 29 '25 19:06 heliang666s

One idea that would simplify the implementation would be (1) to add a submit_async method to the Executor interface (identical to submit except simply returning a loop.create_future() instead of c.f.Future()) and (2) to modify the loop interface to add a get_default_executor method (to remove the annoying hack I did).

E.g.:

class ThreadPoolExecutor:  # Same could also be done for ProcessPoolExecutor just as easily!   
    def submit_async(loop: AbstractEventLoop, fn, /, *args, **kwargs) -> asyncio.Future:
         # ... copy-and-paste from `submit` ...
         f = loop.create_future()
         w = _WorkItem(FakeFuture(loop, f), fn, args, kwargs)
         # ... copy-and-paste from rest of `submit`

# if we wanted to be clever, we could actually avoid a new method altogether by doing something like:
class ThreadPoolExecutor:
   def submit(fn, /, *args, **kwargs):
        loop = None
        if isinstance(fn, AbstractEventLoop):
             loop = fn
             fn, *args = args
        ...
        f = loop.create_future() if loop else _base.Future()
        # etc.

class BaseEventLoop:
    def get_default_executor():
        return self._default_executor or (self._default_executor := ...)

If both of those ideas were implemented, the resulting to_thread implementation would become extraordinarily simple:




async def to_thread(func, /, *args, *kwargs):
    loop = get_running_loop()
    executor = loop.get_default_executor()
    return await executor.submit_async(loop, contextvars.copy_context().run, func, *args, **kwargs)

Of course, this idea would require modifying the public apis for executor and/or the event loop, so I'm not sure if that's cool or would require some kind of lengthy "official" proposal or some-such.

HansBrende avatar Jun 29 '25 20:06 HansBrende

一个可以简化实现的想法是(1)submit_async向接口添加一个方法Executor(与submit简单地返回一个loop.create_future()而不是 cfFuture() 相同)和(2)修改循环接口以添加一个get_default_executor方法(以删除我所做的烦人的黑客攻击)。

例如:

class ThreadPoolExecutor: # Same could also be done for ProcessPoolExecutor just as easily!
def submit_async(loop: AbstractEventLoop, fn, /, *args, **kwargs) -> asyncio.Future: # ... copy-and-paste from submit ... f = loop.create_future() w = _WorkItem(FakeFuture(loop, f), fn, args, kwargs) # ... copy-and-paste from rest of submit

if we wanted to be clever, we could actually avoid a new method altogether by doing something like:

class ThreadPoolExecutor: def submit(fn, /, *args, **kwargs): loop = None if isinstance(fn, AbstractEventLoop): loop = fn fn, *args = args ... f = loop.create_future() if loop else _base.Future() # etc.

class BaseEventLoop: def get_default_executor(): return self._default_executor or (self._default_executor := ...) 如果这两个想法都得到实施,最终的to_thread实现将变得_非常_简单:

async def to_thread(func, /, *args, *kwargs): loop = get_running_loop() executor = loop.get_default_executor() return await executor.submit_async(loop, contextvars.copy_context().run, func, *args, **kwargs) 当然,这个想法需要修改执行器和/或事件循环的公共 API,所以我不确定这是否很酷或者是否需要某种冗长的“官方”提案或类似的东西。

Good idea,but maybe a breaking change.This is an excellent idea for a cleaner API, though I'm mindful that changing the public Executor interface is a significant step that would likely require a broader discussion.cc @serhiy-storchaka

heliang666s avatar Jun 29 '25 21:06 heliang666s

One other alternative idea (last one, promise), if it turned out to be acceptable to make the Executor interface slightly more generic, would be to add the following method, which would encapsulate the ability to do both things simultaneously. If we look at Executor from a more abstract perspective, it really just needs a callback with 4 methods defined to work correctly. We could give it a lot more power by allowing one to pass in this callback explicitly instead of it always choosing c.f.Future in such an opinionated fashion.

class WriteOnlyFuture(Protocol):

    def set_running_or_notify_cancel(self) -> bool:
        ...

    def set_result(self, result):
        ...

    def set_exception(self, exception: BaseException):
        ...

    def cancel(self):
        ...

class ThreadPoolExecutor:   # would also work for ProcessPoolExecutor

    def send_to_future(future: WriteOnlyFuture, fn, /, *args, **kwargs)
        ...

That being said, if it is too much pain to add to the public API, my original implementation works great without changing any public-facing items; it's just not quite as clean. Perhaps @JelleZijlstra would have some insight into these pros and cons?

HansBrende avatar Jun 29 '25 22:06 HansBrende