`simulate_for_sbi` not saturating CPU
Dear devs,
I have run into this unexpected behaviour while running the sbi.inference.simulate_for_sbi method.
It appears that, in a multi-processing scenario, the single core saturation is capped at a certain value that decreeses in function of the number of simulations to be produced. This inevitably leads to increased simulation times.
I have tried to circumvent the sbi.inference.simulate_for_sbi method both my manually implementing multi-processing (with the straightforward Python multiprocessing.Pool) and with the p_map method from the p_tqdm library. The latter has the advantage of being extremely easy to use, and of natively supporting tqdm execution bars. These two options correctly saturate all the CPUs involved in the computation.
I wonder if this behaviour is coming from the choice of joblib as multiprocessing library, and if a switch to p_tqdm could improve the performance of the SBI library.
Below, I provide some code to test this behaviour. The requirements are the source version of sbi as well as the PIP version of p_tqdm. The code contains three benchmarking methods (vanilla Pool, p_map and simulate_for_sbi) as well as two handles for the number of processes (workers) to be used and the number of simulations to be generated.
By looking at the ETA, the runtime difference between simulate_for_sbi and p_map is negligible for a low simulation number (e.g. 100) but starts increasing together with it, arriving at a factor 10 difference in case of a num_simulations equal to 1M.
I hope this report can be helpful, especially since the simulation runtime represents a bottleneck in many common usecases.
from multiprocessing import Pool
from time import time
from datetime import datetime, timedelta
import numpy as np
import torch
from p_tqdm import p_map
from sbi.utils import BoxUniform
from sbi.utils.user_input_checks import process_simulator, process_prior
from sbi.inference import simulate_for_sbi
def timeit(func):
'''Decorator method for printing the execution time'''
def wrapper(*args, **kwargs):
print(f'\n > Generation started:\t{datetime.now().strftime("%H:%M:%S")}')
beg = time()
func(*args, **kwargs)
end = time()
print(f' > Generation complete:\t{datetime.now().strftime("%H:%M:%S")}')
tds = str(timedelta(seconds=end-beg)).split(':')
print(f' > Total runtime:\t{tds[0]}h {tds[1]}min {tds[2]:.4}s')
return wrapper
def simulation(theta):
'''Some generic simulation function.
You can change this to study, e.g. the impact of numpy's multithreading
on the code's multiprocessing - np.linalg.eig
'''
# matrix = np.random.rand(200,200)/theta
# eigval, eigvec = np.linalg.eig(matrix)
for _ in range(100):
np.random.randn(10000)
return 1
@timeit
def run_simulations_pool(prior, num_simulations, num_processes):
'''Generates the joint using python's native Pool multiprocessing'''
theta_low = prior.base_dist.low.cpu().numpy()
theta_high = prior.base_dist.high.cpu().numpy()
theta_range = theta_high - theta_low
thetas = np.random.uniform(size=(num_simulations,1)) * theta_range + theta_low
with Pool(num_processes) as p:
x = p.map(simulation, thetas)
@timeit
def run_simulations_p_map(prior, num_simulations, num_processes):
'''Generates the joint using p_map from the p_tqdm library'''
theta_low = prior.base_dist.low.cpu().numpy()
theta_high = prior.base_dist.high.cpu().numpy()
theta_range = theta_high - theta_low
thetas = np.random.uniform(size=(num_simulations,1)) * theta_range + theta_low
x = p_map(simulation, thetas, num_cpus=num_processes)
@timeit
def run_simulations_sim_for_sbi(prior, num_simulations, num_procsses):
'''Generates the joint using sbi.inference.simulate_for_sbi'''
prior, num_parameters, prior_returns_numpy = process_prior(prior)
simulator_wrapper = process_simulator(simulation, prior, prior_returns_numpy)
theta, x = simulate_for_sbi(simulator_wrapper,
prior,
num_simulations,
num_procsses)
if __name__ == '__main__':
prior = BoxUniform(low=torch.tensor([0.]),
high=torch.tensor([10.]),
device='cpu')
num_simulations = 1000000
num_processes = 16
# Uncomment the benchmark that you want to run.
# The the native pool benchmark is just as a ground truth to test.
# print('Benchmarking: sbi.inference.simulate_for_sbi')
# run_simulations_sim_for_sbi(prior, num_simulations, num_processes)
# print('Benchmarking: p_map (p_tqdm)')
# run_simulations_p_map(prior, num_simulations, num_processes)
# print('Benchmarking: native Pool')
# run_simulations_pool(prior, num_simulations, num_processes)
Dear @janko-petkovic
Thanks a lot for looking into this and proving the benchmark. I can confirm your findings. This is indeed a very interesting behavior of simulate_for_sbi. The "embarrassingly parallel" for loops we used here seem to be embarrassingly slow 😬
I think the reason could be the progress bar wrappers we use to show tqdm progress bars. They can are blocking the underlying processes, leading to the reduced CPU usage. Combining joblib with tqdm has been an issue for some time. But there has been some improvements to joblib recently which now make it possible to combine more elegantly (and performant). It's now possible to return the Parallel object as a generator which can be passed to tqdm directly, see https://github.com/joblib/joblib/issues/972#issuecomment-1623366702.
Indeed, if I add the following minimal joblib function to the benchmark I get the same performance as with p_tqdm:
@timeit
def run_simulations_joblib(prior, num_simulations, num_processes):
'''Generates the joint using joblib'''
theta_low = prior.base_dist.low.cpu().numpy()
theta_high = prior.base_dist.high.cpu().numpy()
theta_range = theta_high - theta_low
thetas = np.random.uniform(size=(num_simulations, 1)) * theta_range + theta_low
x = [
r
for r in tqdm(
Parallel(return_as="generator", n_jobs=num_processes)(
delayed(simulation)(theta) for theta in thetas
),
total=num_simulations,
)
]
Results:
Looking into the code for simulate_for_sbi and simulate_in_batches, I also see that it really needs some refactoring. I will make a PR. If you like you can contribute the joblib fix there as well.
@tomMoral might have a word or two to say about this?
Very glad to see this! Also, very interestingly, the fix you proposed seems to work better than p_tqdm and the current simulate_for_sbi in the case which
def simulation(theta):
'''Some generic simulation function.
You can change this to study, e.g. the impact of numpy's multithreading
on the code's multiprocessing - np.linalg.eig'''
matrix = np.random.rand(200,200)/theta
eigval, eigvec = np.linalg.eig(matrix)
return 1
where the vectorization of np.linalg.eig is known to sometimes interfere with the higher level multiprocessing. Good that joblib takes care of this and that the fix can be implemented somewhat easily!
Looking into the code for
simulate_for_sbiandsimulate_in_batches, I also see that it really needs some refactoring. I will make a PR. If you like you can contribute thejoblibfix there as well.
Sure, I'd love to. How can we organize?
@tomMoral might have a word or two to say about this?
yes please!
Sure, I'd love to. How can we organize?
Please follow the instructions here: https://github.com/sbi-dev/sbi/blob/main/docs/docs/contribute.md In short: create a fork, work on your own feature branch, install pre-commit to run hooks for code style conventions, then make a PR. Let me know if anything is unclear.
Regarding the refactoring: I would
- get rid of
simulate_in_batchesentirely and just make a call to joblib wrapped intqdmdirectly insimulate_for_sbi - make sure to wrap the simulator with a seed when a seed is passed (see
simulate_in_batches) - make sure to pass batches of theta if
batch_size>1(seesimulate_in_batches)
One strange thing I noticed and haven't understood yet: In the benchmark, you sampled thetas using a numpy.random call inside each function. You could have just sampled from the prior with prior.sample((num_simulations,).
However, if I do so, the following call to joblib is much slower 🤔 , e.g., 600 it / s vs. 150 it / s. Can you confirm this?
Great! I will start working on it then.
Concerning your observeation: yes, I can confirm that, and it seems to happen because, again, the CPU saturation is reduced. From what I see, it is connected to the fact that prior.sample outputs a torch.Tensor. Indeed, with
thetas = prior.sample((num_simulations,)).tolist()
# [... joblib code ...]
everything goes back to normal, and che CPU returns to full saturation, with equal runtimes across the board. Why torch conflicts with joblib I have, unfortunately, no idea yet :laughing:
Here is another short benchmark to check how the simulator changes the parallelization with joblib. It's really brute force, but I wanted a clear picture for the strange behavior from above.
I test four different simulators. One that
- takes and returns
numpyarrays and usesnumpy.random.randn, - takes and returns
numpyarrays but internally usestorch.randn - and the same two combinations for
torcharguments and returns.
additionally I also test whether passing a list of args to joblib changes the timings.
Here are the results:
To summarize: It seems that torch.randn is just much faster than np.random.randn and that the type of the inputs or returns does not matter.
ChatGPT even created a histogram from it, which shows it quite clearly: argument type does not matter, but torch vs numpy does, i.e., internally using torch is much faster.
import time
import numpy as np
import pytest
import torch
from joblib import Parallel, delayed
from torch import Tensor
from tqdm import tqdm
def numpy_simulator(theta: np.ndarray) -> np.ndarray:
if isinstance(theta, Tensor):
theta = theta.numpy()
for _ in range(100):
np.random.randn(10000) + np.random.randn(*theta.shape)
return theta
def numpy_torch_simulator(theta: np.ndarray) -> np.ndarray:
if isinstance(theta, np.ndarray):
theta = torch.from_numpy(theta)
for _ in range(100):
torch.randn(10000) + torch.randn_like(theta)
return theta.numpy()
def torch_simulator(theta: Tensor) -> Tensor:
if isinstance(theta, np.ndarray):
theta = torch.from_numpy(theta)
for _ in range(100):
torch.randn(10000) + torch.randn_like(theta)
return theta
def torch_numpy_simulator(theta: Tensor) -> Tensor:
if isinstance(theta, Tensor):
theta = theta.numpy()
for _ in range(100):
np.random.randn(10000) + np.random.randn(*theta.shape)
return torch.from_numpy(theta)
@pytest.mark.parametrize("type", ["numpy", "torch", "list"])
@pytest.mark.parametrize(
"simulator",
[numpy_simulator, numpy_torch_simulator, torch_simulator, torch_numpy_simulator],
)
def test_joblib_benchmark(simulator, type):
num_simulations = 10000
if type == "torch":
theta = torch.distributions.Uniform(-1, 1).sample((num_simulations,))
elif type == "numpy":
theta = np.random.uniform(size=(num_simulations, 1))
elif type == "list":
theta = [np.random.uniform(size=(1,)) for _ in range(num_simulations)]
num_processes = 10
tic = time.time()
x = [
r
for r in tqdm(
Parallel(return_as="generator", n_jobs=num_processes)( # type: ignore
delayed(simulator)(batch) for batch in theta
),
total=num_simulations,
disable=not True,
)
]
toc_joblib = time.time() - tic
# print the time for given simulator
print(f"{simulator.__name__}; arg type: {type}: {toc_joblib:.2f} seconds \n")
To run it, do pytest benchmark_file.py::test_joblib_benchmark -s
I am able to reproduce what you find on my machine; indeed, torch.rand seems to be much faster.
One important thing however: 10000 runs do not incur in any unexpected behaviour; to observe it clearly, 1M runs are necessary. In this latter case, one can see that:
thetaof typenumpyandlistproduces benchmarks with300 - 1000 it/s, with the correct difference betweentorch.randandnp.random.randthetaof typetorchprocudes benchmarks around100 it/s, together with a reduced CPU saturation (on linux one can see it directly onhtopwhile running the tests).
I think that this behaviour does not depend on torch.rand but on some "conflict" between joblib and torch. I still need to read more into this however.
OK, thanks for checking this. It would be interesting to get @tomMoral take on this.
also, this https://pytorch.org/docs/stable/notes/multiprocessing.html might be a relevant read.
Hello guys. A quick update on the state of the issue. After discussing with @janfb, we decided to proceed as follows:
- a first PR with an additional test will be submitted (
tests/cpu_saturation_test.py). This test is meant to let the tester observe the CPU usage (via the process manager orhtop) while a large number of simulations is running. - a second PR, with the actual refactoring and hotfix of
simulate_for_sbi. After (if) accepted, one should run again thecpu_saturation_test.pyand compare the results, hopefully noticing considerable improvement.
I will now do the first PR. Please let me know if you have any concerns.