aiomultiprocess
aiomultiprocess copied to clipboard
Pool not utilising available CPUs
Description
Hi, I'm sure this will turn out to be more of a question rather than a bug, anyway... I'm trying to run a very basic example script following from the example provided in the repo. I'm using aiomultiprocess.Pool
to execute a task which sleeps and then does some CPU intensive work. I've timed the execution and it takes the same amount of time with aiomultiprocess.Pool
when running with 1 process or 2, or 4, or 8.
Further to this, I have added equivalent implementations using asyncio
(which just runs on a single process of course) and multiprocessing.Pool
. The asyncio
version takes the same amount of time as the aiomultiprocess.Pool
version. The multiprocessing.Pool
version takes less than half the time when running with 4 cores which is in the range of my expectations.
As well as the timing results I can also see from watching htop
during the experiment that the aiomultiprocess
and asyncio
versions are maxing out a single CPU whilst the multiprocess
version is using around 80% on 4 of 8 of the available logical cores on my machine at any given moment. As an aside, is there a better way to check CPU utilisation of python scripts rather than eye balling htop
?
Here's an example script showing the three implementations. I think I've done everything correctly as far as I can tell but clearly I must be missing something! I like the look of aiomultiprocess
and would like to use it in a project at work so any help getting it working would be much appreciated!
#!/usr/bin/env python3
"""A simple example of multiprocessing + asyncio with the aiomultiprocess library.
"""
import asyncio
import multiprocessing
import time
from typing import Any, Dict, List
import aiomultiprocess
SLEEP_TIME = 0.1
CALC_ITERS = 100000000
def calc(iters=CALC_ITERS) -> float:
"""Performs FLOPS in a loop with specified number of iterations."""
x = 0.0
for i in range(1, iters + 1):
x = (x + i) / i
return x
async def exec_and_sleep(*args: List, **kwargs: Dict) -> float:
"""Sleeps and then runs a CPU bound function."""
await asyncio.sleep(SLEEP_TIME)
return calc()
def get_or_create_event_loop():
try:
return asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()
def wrap_async(fn, args, kwargs) -> Any:
loop = get_or_create_event_loop()
result = loop.run_until_complete(fn(*args, **kwargs))
return result
async def main_aiomultiprocess() -> None:
async with aiomultiprocess.Pool(processes=4) as pool:
results = await pool.map(exec_and_sleep, [() for _ in range(4)])
print(results)
async def main_asyncio() -> None:
results = await asyncio.gather(*[exec_and_sleep() for _ in range(4)])
print(results)
def main_multiprocessing() -> None:
with multiprocessing.Pool(processes=4) as pool:
results = pool.starmap(wrap_async, [(exec_and_sleep, (), {}) for _ in range(4)])
print(results)
if __name__ == "__main__":
t_start = time.monotonic()
asyncio.run(main_aiomultiprocess())
t_total = time.monotonic() - t_start
print(f"Execution of {main_aiomultiprocess.__name__} took {t_total:.2f}s")
t_start = time.monotonic()
asyncio.run(main_asyncio())
t_total = time.monotonic() - t_start
print(f"Execution of {main_asyncio.__name__} took {t_total:.2f}s")
t_start = time.monotonic()
main_multiprocessing()
t_total = time.monotonic() - t_start
print(f"Execution of {main_multiprocessing.__name__} took {t_total:.2f}s")
Here's the results of running the script
(.venv-dev) csk@aquila: PMF $ aiomultiprocess-simple-example.py
[1.0000000100000002, 1.0000000100000002, 1.0000000100000002, 1.0000000100000002]
Execution of main_aiomultiprocess took 23.48s
[1.0000000100000002, 1.0000000100000002, 1.0000000100000002, 1.0000000100000002]
Execution of main_asyncio took 23.36s
[1.0000000100000002, 1.0000000100000002, 1.0000000100000002, 1.0000000100000002]
Execution of main_multiprocessing took 10.52s
Details
- OS: Ubuntu 20.04 LTS
- Python version: 3.7.11
- aiomultiprocess version: 0.9.0
- Can you repro on master? Not checked
- Can you repro in a clean virtualenv? Yes
@jreese I have come across this confusion, and I would be grateful if you could solve it
@chrisk314 @fz-gaojian The default child_concurrency is 16, change to 0 or 1 and behavior would be as you expected.