taskiq
taskiq copied to clipboard
Set ThreadPool as default executor
There are many workflows that require interweaving async and non async (CPU intensive) blocking code. These cannot each be split up into separate tasks because there are locally stored files involved. The best solution is thus to offload the blocking tasks to the executor so as to not block the asyncio loop.
If I understand correctly, each worker process starts a ThreadPoolExecutor in which sync tasks are run. Being able to access this thread pool instead of making another one would be ideal. Currently we are working around this by having a custom receiver, accessing the instance of the threadpool and storing the reference in the application state.
from taskiq.receiver import Receiver
class CustomReceiver(Receiver):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Store the executor in the broker's state for global access
# This allows us to run CPU-heavy code on the workers
# without blocking the asyncio loop
self.broker.state.executor = self.executor
But I believe a much better solution would be to simply set the created threadpool as the default executor for the asyncio loop so it can be used without passing the reference around:
with ThreadPoolExecutor(args.max_threadpool_threads) as pool:
loop = asyncio.get_event_loop()
loop.set_default_executor(self.executor)
await asyncio.get_running_loop().run_in_executor(None, func)
Or in addition / at the minimum allow us to get the instance of the executor from the API.
In fact, it doesn't seem like a good idea to have more than one thread in that threadpool if all it is used for is genuine CPU-intensive sync tasks, provided IO tasks are run with asyncio.
As far as I can tell, the default of having many threads in a thread pool predates the widespread use of asyncio. If you are only doing blocking tasks on threads, due to the GIL, it is counter-productive to have more than one thread, and using the --workers options with multiprocessing should instead be used to match the CPU count.
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
I agree that adding the ability to reuse the pool is a good addition to the API. However, I disagree with the suggestion that the default thread pool size should be set to 1. The Global Interpreter Lock (GIL) in Python only prevents multiple Python bytecode instructions from running concurrently in the same process. In most cases, when performing calculations, you'll be using libraries like numpy or similar, which are written in C or C++ and operate in native extensions. Since these libraries perform their computations in C, the GIL is not a bottleneck during execution. Here's a small benchmark:
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
def my_sync_task():
sum = 0
for _ in range(50_000_000):
sum += 1
async def my_async_task(tp: ThreadPoolExecutor):
loop = asyncio.get_running_loop()
task1 = loop.run_in_executor(tp, my_sync_task)
task2 = loop.run_in_executor(tp, my_sync_task)
await asyncio.gather(task1, task2)
async def test_run(workers: int, tasks: int):
with ThreadPoolExecutor(max_workers=workers) as tp:
start = time.monotonic()
await asyncio.gather(*[my_async_task(tp) for _ in range(tasks)])
print(f"Time taken: {time.monotonic() - start}")
async def main():
await test_run(10, 5)
await test_run(1, 5)
if __name__ == "__main__":
asyncio.run(main())
When executing this code, the GIL will actually slow down the program, so the execution time for both cases will be almost the same:
Time taken: 15.220620960004453 # With 10 workers and 10 sync tasks running.
Time taken: 14.688892211997882 # With 1 worker and 10 sync tasks running.
[!NOTE] I have an
AMD Ryzen 5 7530U @ 4.546GHz, so you might need to adjust some values to get the similar execution speed.
Now, let's modify the test to replace my_sync_task with something that offloads calculations to a native extension, like numpy:
import numpy as np
def my_sync_task():
rand = np.random.default_rng()
a = rand.integers(1, 100, size=(1300, 1200))
b = rand.integers(1, 100, size=(1200, 1300))
c = a @ b # Perform a big matrix multiplication
In that case GIL will only impact parts of the library that are written in Python and sending data between the interpreter and native extension. Here are results:
Time taken: 7.917032391997054 # With 10 workers and 10 sync tasks running.
Time taken: 20.60293159300636 # With 1 worker and 10 sync tasks running.
Although it's not as fast as one might expect (due to tasks sharing the CPU and the data transfer overhead between Python and C++), you can see a significant improvement when using multiple workers.
So, I would say that setting the worker count to 1 will generally decrease performance, especially for CPU-bound tasks that rely on native extensions. Even when using Python code, there's often little to no difference between having 1 or n threads in the pool.
But to give more flexibility to TaskIQ I would really like adding an ability to set custom executors.
To do so, I suggest the following:
- Add broker field that holds a reference to an executor (
ThreadPoolExecutorby default). - Add a method like
with_executorthat will be able to set an executor to something else.
In that case:
- it will be a super easy to get the executor from a broker. Because you can get it from a broker's field like
broker.sync_executor - You can set it explicitly during broker creation. Like:
broker = Broker().with_executor(ThreadPool...) - You can set it to
None, to use default (global) executor. - You can set global executor during broker's startup.
What do you think?
It is probably best to allow the users to change / set and access the Executor, and document that as a feature. You can then have the default amount of workers equal to the number of CPUs on the system (e.g. via multiprocessing.cpu_count()). This would allow for the most flexibility in use-cases.
when performing calculations, you'll be using libraries like numpy or similar, which are written in C or C++ and operate in native extensions
This is actually a big assumption and not really reflected in what one would expect from a worker library for Python. For a particular example I am working on, I am going through PDFs with pdfium, which does not release the GIL, despite it using an underlying C library, meaning it is effectively CPU-intensive to the main Python process. Within this task, I then call a bunch of OCR tasks using tesserocr, which does release the GIL. To best utilise the CPUs, I set the main Taskiq ThreadPool to 1 worker, and then made a separate ThreadPool which # CPUs as the # of workers, on which I schedule the tesserocr tasks, and then await all of them. This means there is only ever at most # CPUs' worth of Python threads running, since the main pool can only run one task, and it is waiting on results from the other pool before continuing.
If one schedules a bunch of truly CPU-intensive sync tasks on the main Taskiq executor (not releasing the GIL), it is very easy to end up with degraded performance as it keeps context-switching between the various threads. This is not obvious from a user perspective and a common pitfall. Additionally, depending on how the tasks get distributed to workers, it is easy for one worker to pick up multiple sync tasks that it then can't truly run in parallel, instead of them getting spread out over other workers which can do the sync work. Of course, none of this apply to just asyncio tasks.
In light of this, I believe there is still a case for setting the default sync threads to 1, or at the very least documenting the trade-offs in detail.