Call `numba.set_num_threads` when `n_jobs` is specified (e.g. in `scanpy.tl.rank_genes_group`)
- [x] Additional function parameters / changed functionality / changed defaults?
Hi ScanPy team
I emailed @ivirshup but others should be involved I think.
This function would be useful if we could specify the number of threads to use: https://scanpy.readthedocs.io/en/stable/generated/scanpy.tl.rank_genes_groups.html
Based on the number of items in the "groupby" field, we could use a basic split-merge approach here: each thread would take several of these items, the calculations are entirely independent of one another, and then when each is completed we would join + concatenate the results.
I'm happy to help write up a PR help (or participate), but I'd like to hear if this is something you'd be willing to prioritize. (It's related to a project whereby Fabian is the PI.)
Best, Evan
cc: @Zethson @grst
Hey,
In principle this sounds good, but I'd like to hear a little bit more about the usecase.
For context on our side, there are some other paths for speeding up DE available (probably some form of calculating statistics via https://github.com/scverse/anndata/pull/564). There're also increased momentum on more featureful DE in the scverse ecosystem.
If you are specifically looking for faster scanpy DE, this makes sense, though there may be some easier paths forward (at least to me).
If you need anything fancier or even just different, it could be good to check in with other efforts. E.g.
- https://github.com/theislab/pertpy/issues/189
Hi @ivirshup
Thanks for the help
In terms of the use cases here:
(1) Any user doing data processing or interactive analysis could benefit from multithreading here. Consider the two big for-loops which through all of the genes between compared in the samples, and the for loop which automatically does this for each "group" in the ScanPy object.
I'm a bit confused why Seurat or ScanPy never did this....but then I realize that Pagoda2 didn't either: https://github.com/kharchenkolab/pagoda2/blob/main/R/Pagoda2.R#L900
(There's a bit of multithreading there at the end...)
Given the file sizes nowadays and the number of "groups", this is getting fairly computationally intensive. It's one of those simple things your biologists will love ("this is so fast now!").
(2) In terms of our use case, an interactive way to run DE via the client is too slow. We've just started to implement the above ourselves.
RE: pertpy
Could does this relate to @davidsebfischer and diffxpy?
Best, Evan
Given the file sizes nowadays and the number of "groups", this is getting fairly computationally intensive. It's one of those simple things your biologists will love ("this is so fast now!").
I agree it doesn't harm to have rank_genes_groups parallelized (given that it should be straightforward to implement).
What @ivirshup was referring to though, is that rank_genes_groups on single cells in general isn't seen anymore as best practice for DE analysis because it doesn't account for pseudoreplication bias. Please take a look at @Zethson's book chapter.
RE: pertpy
Could does this relate to @davidsebfischer and diffxpy?
Diffxpy is currently being reimplemented. Once it is released, it would likely be included in pertpy as an additional method. I.e. pertpy is more general and strives to provide a consistent interface to multiple methods.
(1) Any user doing data processing or interactive analysis could benefit from multithreading here. Consider the two big for-loops which through all of the genes between compared in the samples, and the for loop which automatically does this for each "group" in the ScanPy object.
My concern is that there will be issues if you keep the current calculations, but parallelize over the groups. Within that loop, I believe large amounts of memory can be allocated. If it's "group vs rest", at least one X worth of memory is allocated per loop from matrix subsetting – since there's an X_group = X[mask]; X_rest = X[~mask].
If you parallelize over groups, now the max memory usage can be ~ min(n_procs, n_groups) * X as opposed to ~2X. For large X (probably where you want to see the speed up most), this can make you run out of memory.
https://github.com/scverse/scanpy/blob/d26be443373549f26226de367f0213f153556915/scanpy/tools/_rank_genes_groups.py#L164-L178
Another memory related concern comes from multiprocessing (mentioned in your email). I think there's recently been some improvement here, but my impression was it's difficult/ impossible to share memory with multiprocessing – so everything that goes into or out of a subprocess has to be copied.
So while I think we can absolutely make use of more processing power here, I think we need to consider the approach.
- The link I mentioned above should reduce copies, and could potentially use a parallelized BLAS for compute
- Much of the loops over "all of the genes between compared samples" are already in compiled code, but could be parallelized
In terms of our use case, an interactive way to run DE via the client is too slow.
What is the interface here? Scanpy computes results for all groups at once, but in most interfaces I've used you can only really "ask" for one comparison at a time. This could also be much faster, if you can just reduce total computation.
What @ivirshup was referring to though, is that rank_genes_groups on single cells in general isn't seen anymore as best practice for DE analysis because it doesn't account for pseudoreplication bias.
Partially, I'm not sure what comparisons are actually being run. I was also wondering if you'd benefit from something fancy like a covariate.
Diffxpy is currently being reimplemented.
As a heads up, I'm unaware of a timeline here
Sorry for reviving such an old topic but, aside from wanting to check up on this, I also wanted to offer some advice to anyone that might stumble upon this in their search for faster marker gene extraction.
(1) Any user doing data processing or interactive analysis could benefit from multithreading here. Consider the two big for-loops which through all of the genes between compared in the samples, and the for loop which automatically does this for each "group" in the ScanPy object.
I have been having the same problem and the best solution i ran into is lvm-DE from scvi-tools (don't let the name fool you it also ranks genes). It does require you to use a neural network model and only entails a significant speedup if you use GPU. Also, if the model is big, the GPU memory usage gets very high. Alas, they do address pseudoreplication bias.
I agree it doesn't harm to have
rank_genes_groupsparallelized (given that it should be straightforward to implement). What @ivirshup was referring to though, is thatrank_genes_groupson single cells in general isn't seen anymore as best practice for DE analysis because it doesn't account for pseudoreplication bias. Please take a look at @Zethson's book chapter.
I agree, I have had to write a custom script that computes the pairwise wilcoxon values, and then average over them as the link you pointed to suggests. I also tried to parallelize, but ran into the same problems that @ivirshup mentioned. So the compute time to get ranked genes for a large dataset with lots of groups remains super large.
The link I mentioned above should reduce copies, and could potentially use a parallelized BLAS for compute
Any advances on this? Or maybe a suggestion for another tool?
Thank you for your time.
Any advances on this? Or maybe a suggestion for another tool?
Sorry, I just realized that, nowadays, scanpy does use parallelization through np.aggregate when calculating marker genes. It is just that the server I was testing things on limited this behavior automatically.
Though I ran into another problem, specifying the number of cores, that i managed to solve. The parameter n_jobs doesn't seem to do the trick. Also setting the conventional env vars doesnt help either.
The correct form is with numba.set_num_threads(num).
Thanks for investigating! I agree, there are too many screws to turn here, all different parallelization libraries we use have their own way.
Reading numba’s docs, it’s clear that numba tries to be smart here:
numba.config.NUMBA_DEFAULT_NUM_THREADSThe number of usable CPU cores on the system (as determined bylen(os.sched_getaffinity(0)), if supported by the OS, ormultiprocessing.cpu_count()if not). This is the default value for numba.config.NUMBA_NUM_THREADS unless the NUMBA_NUM_THREADS environment variable is set.
We should probably call numba.set_num_threads(n_jobs) when n_jobs is set explicitly, I’m renaming this issue
Hi there! Based on the latest update of this issue, is there already a multithreads implement of sc.tl.rank_genes_group? When I run this command using scanpy v1.11.0, it still use single thread. Am I understand the discussion correctly?
import numba
numba.set_num_threads(8)
sc.tl.rank_genes_groups(adata,
groupby='leiden',
method='wilcoxon',
n_jobs=8)
Any advances on this? Or maybe a suggestion for another tool?
Sorry, I just realized that, nowadays,
scanpydoes use parallelization throughnp.aggregatewhen calculating marker genes. It is just that the server I was testing things on limited this behavior automatically.Though I ran into another problem, specifying the number of cores, that i managed to solve. The parameter
n_jobsdoesn't seem to do the trick. Also setting the conventional env vars doesnt help either.The correct form is with
numba.set_num_threads(num).
I found that using wilcoxon methods, the most time-cosuming function is df.rank() at: https://github.com/scverse/scanpy/blob/1.11.0/src/scanpy/tools/_rank_genes_groups.py#L81
This pd.DataFrame.rank() function doesn't use parallel computation and consists of large proportion of computation time.
Meanwhile I find this issue #2060 very useful, just replacing df.rank() with ranks = pd.DataFrame(rankdata(df.values)) using njit function rankdata from https://gist.github.com/jamestwebber/38ab26d281f97feb8196b3d93edeeb7b.
I benchmarked time performance on a 50k cells dataset using my PC. The time consumption decreased from 3m43s to 1m43s
The results are also robust.
Considering that using wilcoxon method to find marker genes is very frequent in in-practical analysis, and there seems to be no conflicts at all. I think this implement or something similar should be quickly merged to save people's time greatly.
Can I create a PR based on this @jamestwebber ? It seems that #2060 hasn't made progress since three years ago @flying-sheep
Happy for you to do so! I only opened the issue as a suggestion as I didn't have time to make a PR myself.
See also:
- joblib has a nice list of all the ways a max number of threads could be set
- Dask sets several
*_NUM_THREADSvariables in some situations, but not the numba one - https://github.com/dask/dask/issues/5229
OK, findings by @Intron7 and me:
-
For
LocalCluster(processes=True)(the default), doing this before running things:dask.config.set({"distributed.nanny.pre-spawn-environ.NUMBA_NUM_THREADS": "1"})(this should be upstreamed into dask: https://github.com/dask/distributed/pull/9076)
-
In any case, running
numba.set_num_threads(1)inside of a worker works. This setting seems to be a thread-local one, but also the docs say this:Numba always launches numba.config.NUMBA_NUM_THREADS threads, but set_num_threads() causes it to mask out unused threads so they aren’t used in computations.
OK so ideally all our dependencies are just implementation details, and we take care of mapping a user’s sc.settings.n_jobs to whatever the dependencies need.
Unless we find a better solution, I’d say
- we find out what other multithreading solutions (apart from numba) we use
- we add a decorator to all our functions that – when encountering a dask array – uses
.map_blocksto prependnumba.set_num_threadsand so on to everything that actually does computation (let’s hope the thread masking is enough)
would look something like
def set_thread_limit(array):
w = get_worker()
# no idea how to get N_WORKERS …
numba.set_num_threads(sc.settings.n_jobs / w.nthreads / N_WORKERS)
return array
def limit_threads_in_dask(fn):
@wraps(fn)
def wrapper(array, *args, **kw):
if isinstance(array, DaskArray):
array = array.map_blocks(set_thread_limit)
return fn(array, *args, **kw)
return wrapper
@limit_threads_in_dask
def some_func(arr, ...): ...
but of course we’d have to be careful how the settings singleton interacts with multiprocessing and so on.