joblib icon indicating copy to clipboard operation
joblib copied to clipboard

Is it possible to return generator of generators in joblib.Parallel?

Open Schefflera-Arboricola opened this issue 2 years ago • 2 comments

In the below code, I'm applying the single_source_bellman_ford_path to individual nodes in the subset helper function(_calculate_shortest_paths_subset) and then using it in delayed to compute shortest paths for all nodes in parallel on all CPU cores.

def all_pairs_bellman_ford_path(G, weight="weight"):
    """The parallel computation is implemented by computing the
    shortest paths for each node concurrently.

    networkx.all_pairs_bellman_ford_path : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.all_pairs_bellman_ford_path.html#all-pairs-bellman-ford-path
    """

    def _calculate_shortest_paths_subset(source):
        return (source, single_source_bellman_ford_path(G, source, weight=weight))

    nodes = G.nodes

    paths = Parallel(n_jobs=os.cpu_count(), return_as="generator")(
        delayed(_calculate_shortest_paths_subset)(source) for source in nodes
    )
    return paths

I want to be able to do the same thing but in such a way that I divide nodes into chunks and then process the chunks in parallel. Also, I want the returned object to be the same(i.e. generator - (source, dictionary); dictionary keyed by target and shortest path as the key value.). I wanted to know if joblib allows this.

def all_pairs_bellman_ford_path_chunk(G, weight="weight"):
    def _calculate_shortest_paths_subset(nodes):
        d=[]
        for source in nodes:
            d+=[(source, single_source_bellman_ford_path(G, source, weight=weight))]
        return d
        # here, using "yield" gives type error and returning a list(or dict) like this changes the return type

    def chunks(iterable, n):
        """Divides an iterable into chunks of size n"""
        it = iter(iterable)
        while True:
            x = tuple(itertools.islice(it, n))
            if not x:
                return
            yield x

    nodes = G.nodes

    cpu_count = os.cpu_count()
    num_in_chunk = max(len(nodes) // cpu_count, 1)
    node_chunks = nxp.chunks(nodes, num_in_chunk)

    paths = Parallel(n_jobs=cpu_count, return_as="generator")(
        delayed(_calculate_shortest_paths_subset)(nodes) for nodes in node_chunks
    )
    return paths

Any kind of guidance on how to tackle this would be great! Thank you :)

ref. https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.weighted.all_pairs_bellman_ford_path.html#all-pairs-bellman-ford-path

nx_parallel/algorithms/shortest_paths/weighted.py

https://github.com/networkx/nx-parallel/pull/42

Schefflera-Arboricola avatar Feb 14 '24 12:02 Schefflera-Arboricola

~~You're trying to parallelize a task on chunks over all available cpu processes, and you want that each task created this way also run some function in parallel over all available cpu processes. So your code is effectively scheduling (cpu_count * cpu_count) processes. This might not be the best way to make the best use of the cpu cores, since your processes are going to compete on way less available resources than you schedule processes. (NB: and in theory joblib refuses to do that, you're probably seeing this warning ).~~

~~I think you need to redesign how your grid of tasks is programmatically scheduled, in such a way that it is entirely defined in the main process, rather than having a dispatch strategy that relies on callbacks in child processes ~ from the look of your snippet, this is possible there, put differently what you're trying to achieve with two nested Parallel calls, looks in fact achievable with only one Parallel call.~~

edit: not quite, I misread the snippet. Would be easier to get with minimal reproducer ;-)

fcharras avatar Feb 21 '24 13:02 fcharras

Second try: now my understanding is that the issue you mention is purely because of return types, you can add this instruction just before returning paths:

paths = (path for paths_chunk in paths for path in paths_chunk)

(The _calculate_shortest_paths_subset should indeed be a return and not a yield, and on top of that it should return something that inter-process communication can digest.)

fcharras avatar Feb 21 '24 13:02 fcharras

Thanks @fcharras , this helped a lot!

PS: I actually initially wanted to yield(and not return a generator) but I didn't know if that was possible in joblib. But, your response inspired me to think further and I eventually did the following. (ref. https://github.com/networkx/nx-parallel/pull/49).

    paths_chunk_generator = (
        delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks
    ) # a lazy generator

    for path_chunk in Parallel(n_jobs=cpu_count)(paths_chunk_generator):
        for path in path_chunk:
            yield path

And using yield and adding chunking also made the algo perform better!

Thanks!

Schefflera-Arboricola avatar Feb 23 '24 14:02 Schefflera-Arboricola