flowpipe
flowpipe copied to clipboard
Multiprocessing optimizations
Currently, every node will be evaluated in it's own process. There are however situations where chains of nodes could be grouped together and evaluated in one single process, thus limiting the amount of processes and saving on the startup time for these unnecessary processes.
Consider this graph:
from flowpipe.graph import Graph
from flowpipe.node import Node
@Node(outputs=['out'])
def MyNode(in_):
return {'out': 1}
graph = Graph()
a = MyNode(name="Grp1-a", graph=graph)
b = MyNode(name="Grp1-b", graph=graph)
c = MyNode(name="Grp2-c", graph=graph)
d = MyNode(name="Grp2-d", graph=graph)
e = MyNode(name="Grp3-e", graph=graph)
a.outputs["out"] >> b.inputs["in_"]
b.outputs["out"] >> e.inputs["in_"]["0"]
c.outputs["out"] >> d.inputs["in_"]
d.outputs["out"] >> e.inputs["in_"]["1"]
+-------------+ +-------------+ +--------------------+
| Grp1-a | | Grp1-b | | Grp3-e |
|-------------| |-------------| |--------------------|
o in_<> | +--->o in_<> | % in_ |
| out o-----+ | out o--------->o in_.0<> |
+-------------+ +-------------+ +--->o in_.1<> |
+-------------+ +-------------+ | | out o
| Grp2-c | | Grp2-d | | +--------------------+
|-------------| |-------------| |
o in_<> | +--->o in_<> | |
| out o-----+ | out o-----+
+-------------+ +-------------+
We have 5 nodes resulting in 5 processes. If we would group the nodes, we could lower the number of processes to 3 while still maintaining the same benefits of multi-processing.
Consider this (naive) pseudo-code on how this grouping could work:
groups_counter = 0
groups = {}
visited_nodes = []
for node in graph.nodes:
if node.name in visited_nodes:
continue
if len(node.upstream_nodes) < 2:
grp = str(groups_counter)
groups[grp] = {
"nodes": [node.name]
}
groups_counter += 1
visited_nodes.append(node.name)
parent = node
while True:
if len(parent.downstream_nodes) != 1 or len(parent.upstream_nodes) > 1:
break
down = parent.downstream_nodes[0]
if len(down.upstream_nodes) != 1:
break
groups[grp]["nodes"].append(down.name)
visited_nodes.append(down.name)
parent = down
else:
groups[str(groups_counter)] = {
"nodes": [node.name]
}
groups_counter += 1
visited_nodes.append(node.name)
For the above example, the result would look like this:
{
"0": {
"nodes": [
"Grp1-a",
"Grp1-b"
]
},
"1": {
"nodes": [
"Grp2-c",
"Grp2-d"
]
},
"2": {
"nodes": [
"Grp3-e"
]
}
}
To make this possible we'd need to solve these issues:
- Handle dependencies between groups, they would probably just accumulate the dependencies of the nodes they are handling
- Create a function similar to
graph.evaluate_node_in_process
that handles multiple nodes
The grouping could then be an option when multiprocessing:
graph.evaluate_multiprocessed(optimized_evaluation=True)
I have not tested whether this would actually be beneficial, but it made sense to me.
I like this idea a lot.
A simple and probably very powerful way to move forward is to restrict the focus on the simple case that you present in your example: Linear chains of nodes. They are
- easy to find,
- there are no dependencies between different chains by definition (because a node with more than one input can only occur as the first node of such a chain) and
- are also guaranteed not to waste any optimization potential from the multiprocessing, as they form a dependency chain by definition.
As for your second concern: Would it be possible to do the following:
- Identify all linear chains of nodes (there might be chains of length one, or the entire graph might be one chain, which is all fine).
- For each chain, a new
Graph
is instantiated and the nodes that are assigned to this chain get added into that graph. - Instead of having a
graph.evaluate_node_in_process
method, introduce agraph.evaluate_subgraph_in_process
method and spawn a process for each chain (which might contain only a single individual node).
So, basically I suggest collapsing the graph into subgraphs in such a way that each remaining component has at least two inputs and at most one output (with the exceptions of the entry and exit nodes of the original graph).
I hope I managed to express myself clearly - If I am being confusing, don't hesitate to ask what the hell I mean :-)
-
Limiting to chains only for now: Agreed!
-
Subgraphs I like the subgraph idea but I think we should treat that in a separate ticket and decide on how to deal with sub graphs properly #84
We might actually be able to achieve the sub-graph solution without sub-graphs though, by using a dedicated node instead. The node would have one in and one output just for treating the relationships. The node would then evaluate the node chain assigned to it. Hope that makes sense, I can add an example later.