pygraphistry
pygraphistry copied to clipboard
[FEA] Node reductions
Node reductions are great!
What
This is an important case of graph reductions. There are requests in both the UI and APIs to do reductions at the level of node, multiedge, and multiple nodes / subgraphs. Instead of simply filtering out the selected entities, the idea is they'd be partitioned into subgraphs, and different kinds of operations would replace them new topologies, and attributes representing them. They seem like some sort of continuous derivative operators, and may even be invertable.
Example
One of the most common cases is dropping a node and propagating its attributes and edges/attributes through its neighborhood, so
In hypergraph:
g = graphistry.hypergraph(
pd.DataFrame([
{'x': 'a', 'y': 'm'},
{'x': 'b', 'y': 'm'}
],
direct=True)
We'd get 3-node graph (a)-[e1]->(m)<-[e2]-(b)
However, we really just want (a)-[m]-(b) ... so need a way to drop node (m) and synthesize ede -[m]->. This gets weirder in the cases of multiedges, cliques, and different kinds of nodes:
g2 = g.replace_node_with_edges(
select_nodes=g._nodes.query('type="m"'),
edge_reducer=g.reducers.pair_unique,
edge_attr_reducer={'weight': 'max'})
g2 = g.replace_node_with_edges(select_nodes='m')
g2 = g.replace_node_with_edges(select_nodes=[1])
g2 = g.replace_node_with_edges(select_nodes=[False, True, False])
g2 = g.replace_node_with_edges(select_nodes=g._nodes.query('type="m"'))
g2 = g.replace_node_with_edges(select_nodes='m', edge_reducer=g.reducer.pair_unique_undirected, edge_reducer_attr={'weight': 'max', ...})
Sample reducers
This gets at patterns of graph reducers, where we want to take nodes/edges, remove them, and replace with other nodes edges. For example, we can replace all type='m' nodes with (a: x)-[e]->(b: x) via a node reduction driven by a selector predicate:
1. Node reducers: Drop a node by converting to edges
g2.replace_node_with_edges(selected=g._nodes['type'] == m, reducer=lambda edges: strong_component(edges_to_nodes(edges)))
The idea is pull out nodes matching some predicate
def replace_node_with_edges(g):
nodes = g._nodes[ selected ]
new_edges = []
for node in g._nodes[g._node]:
in_edges = g._edges[ g._edges[g._src] == node ]
out_edges = g._edges[ g._edges[g._src] == node ]
new_edges.append( reducer(pd.concat([in_edges, out_edges]))
new_nodes = g._nodes[ ~selected ]
g2 = g.nodes(new_nodes).edges(new_edges)
return g2
where
def strong_component(edges, s='src', d='dst')
node_ids = pd.concat([edges[s].unique() + edges[d].unique()]).unique()
edges = []
for x in range(len(node_ides)):
for y in range(x+1, len(node_ids)):
edges.append([x,y])
return edges
2. Sugar for common cases
g2 = g.replace_node_with_edges(select_key='type', select_value='m', reducer=reducers.pair_unique)
3. Build into the hypergraph transform, such as during or post-process
g2 = graphistry.hypergraph(
pd.DataFrame([
{'x': 'a', 'y': 'm'},
{'x': 'b', 'y': 'm'}
],
edges={ }, # no direct edges
reducer_subgraphs=[ {'reducer_type': 'node_reducer', 'node_selector': ['y'], 'output_nodes': ['x'], 'reducer': 'pair_unique'} ]
)
Or as a post-process:
g2 = graphistry.hypergraph(
pd.DataFrame([
{'x': 'a', 'y': 'm'},
{'x': 'b', 'y': 'm'}
],
direct=true,
reduce=[ {'reducer_type': 'node_reducer', 'node_selector': ['y'], 'reducer': 'pair_unique'} ]
)
Plan
I think it's worth doing some principled experiments of different shaping use cases first (replace_node_with_edges()) as the API is not at all obvious:
- selecting nodes: by index, type, ... ?
- reducing a neighborhood to new edges: generic handler + typical cases
- handling attribute reductions as part of that: generic handler + typical cases
Ideally we come to something simple + extensible in PyGraphistry, and use that to drive subsequent incorpration in hypergraphs, UI, and new preprocessing APIs.
Common scenarios
- Turn hypergraph
(event {...e} )->(attribute)into(attribute)-{...e}->(attribute) - Turn bipartite
(a)->(b)graph into graph of(a)-[e {...b}]->(a)or(b)-[e {...a}]->(b)- Can combine with edge reductions to collapse multiedge
(a1)-[e1]->(a2), (a1)-[e2]->(a2), ...into singular(a)-[e {...}]->(b) - If removed edges + nodes have props, should be available to generator of new edges
- Can combine with edge reductions to collapse multiedge
- Collapse some hierarchy into summary nodes:
- Rigid: Ex - collapse chain of
country -> state -> city -> streetintocountry -> state { street_count: 100 } - Loose: Ex - collapse communities like
user { community: 1 } --[friend]--> user {community: 1}intocommunity { id: 1, user_count: 2 }
- Rigid: Ex - collapse chain of
I am interested in working on this issue. I am currently reading through this and collecting some thoughts around it, do you mind assigning it to me?
Awesome, this has been a long time coming!
I'm guessing the idea here is not super well-formed once you work through some examples. The ideas of collapsing nodes/edges and then replacing with simpler nodes/edges with summary stats is a common request, but I'm sure a lot of edge cases. So even the basic API definition seems up for grabs too.
My intuition is this will be used similarly to df.groupby(..).agg(...) in pandas, except needs to generalize to grouping nodes: removing them + their edges; reconnecting the new summary nodes to their old neighborhoods (accounting for replaced parts); and enriching with aggregate stats both on the new resulting nodes and edges. So probably a lot of APIs we can learn from + reuse!
Oh, I forgot to mention: we're starting to do both CPU (pandas) + GPU (cudf/dask_cudf) flavors of implementations.
Totally fine to just start with CPU and I can help on GPU. To make the process easier, helps a lot to write as vectorized pandas. E.g., s = df['some_col'].sum() vs for x in df['some_col']: s += x. A lot of code ends fully reusable or just swapping a few pd for cudf calls.
My ideas are still incomplete. I would like to pick your brains on what you think the output should look like for simple use cases. Discussing specific use cases can help build a bigger picture. If you take the following dataframe
g = graphistry.edges(pd.DataFrame([{'x': 'a', 'y': 'm'},
{'x': 'b', 'y': 'm'},
{'x': 'c', 'y': 'n'},
{'x': 'd', 'y': 'm'}]), 'x', 'y')
Replacing all the nodes (m) with an edge [m] results in a clique(I guess) consisting of nodes (a), (b) and (d). Let's say we have a naive implementation of replace_node_with_edges('m'), which takes a node that need to be replaced with edges, like you described above. Let's treat the function as a blackbox for now and ignore the implementation details. Would you be able to illustrate how you imagine the output to look like for this function call?
g.replace_node_with_edges('m')
a->m<-b, c->n, d->m
Yes, I think you've got it: (a), (b), and (d) would now form a clique, so it'd be equiv to writing:
g2 = graphistry.edges(pd.DataFrame([
# the clique
{'x': 'a', 'y': 'b'},
{'x': 'a', 'y': 'd'},
{'x': 'b', 'y': 'd'} # we will treat all edges as undirected, even though the UI rendering will be directed
# the rest
{'x': 'c', 'y': 'n'}],
'x', 'y')
We may want to further include provenence information:
g2 = graphistry.edges(pd.DataFrame([
# the clique
{'x': 'a', 'y': 'b', 'reduced_from': 'm', 'type': m['type']}, # assumes original graphistry.nodes(df2) had col `type`
{'x': 'a', 'y': 'd', 'reduced_from': 'm', 'type': m['type']},
{'x': 'b', 'y': 'd', 'reduced_from': 'm', 'type': m['type']}
# the rest
{'x': 'c', 'y': 'n'}],
'x', 'y')
Likewise, we can imagine updating node/edge based on aggregates like:
# node props
(a { f }) = f(a, [ (a->m), m for (a, m) in edges])
# edge props
(a)-[e { f }]->(b) = f(a, a->m, m, m->b, b)
My intuition is a mix of pandas/r's group/apply agg pattern for table reductions, and how tools like pregel/graphx do iterative maps on labeled <src_node, edge, dst_node> triples.
I would like to go through the suggested function signature. Please correct me if you sense any misunderstanding on my part.
g2 = g.replace_node_with_edges(
select_nodes=g._nodes.query('type="m"'),
edge_reducer=g.reducers.pair_unique,
edge_attr_reducer={'weight': 'max'})
select_nodes are the nodes which will become edges. select_nodes can be of type str, list(str), Series or a single-columned DataFrame.
edge_reducer probably determines the combination of the newly created edges. In the following example
df = pd.DataFrame({'x': ['a', 'b'], 'y': ['m', 'm'], 'w':[30,15})
g = graphistry.edges(df, 'x', 'y').nodes(df, 'y').bind(edge_weight='w')
for select_nodes=['m'], g.reducers.pair_unique results in edges (a,b) and (b,a) but g.reducers.pair_unique_undirected should only give edge (a,b)
edge_attr_reducer sets the attributes of the newly created edges. In the following example
df = pd.DataFrame({'x': ['a', 'b', 'c','d','e'], 'y': ['m', 'm', 'n', 'm','o'], 'w':[30,15,40,20,50]})
g = graphistry.edges(df, 'x', 'y').nodes(df, 'y').bind(edge_weight='w')
for select_nodes=['m','n'], edge_attr_reducer={'weight': 'max'} will set the edge_weight of the newly created edges alone to 40. weight can probably take other reducers as well, like min, sum, mean etc.
Clarifying it here will help me with the PR, thanks :)
select_node seems close! For select_nodes =g._nodes.query(...) to "just work", we'd also take a df where we slice on g._node
--
For reducers, I'm not sure of the right signatures, so feel free to propose :) If we can keep these minimal and push additional reductions to complementary operators, that'd make this simpler.
Working backwards from some sample use cases:
-
Hypergraph example:
(a:attrib)<-[]-(m:event { props })-[]->(b:attrib)=>(a:attrib)-[m:event{props}]->(b:attrib)- want to propagate the
m:eventnode atttributes to the newm:eventedge - Out of scope: playing with multiedge vs single edge on the result is interesting, but we can defer to a complementary
Edge reductioncall for now to keep things simple - Out of scope: likewise, we might want to record a removed event count for each attrib... but that can be down w/ a subsequent neighborhood mapper that counts later
- want to propagate the
-
Bipartite -> single type:
- Ex: If
(u1:user)<-[e1:event {time1}]-(ip1:IP)-[e2:event {time2}]->(u2:user)=>(u1)-[ip1 { minTime, maxTime}]-(u2) - So for every (a), (m), (b), we want new edge with properties
(e { ...f( edge a-m, edge m-b) }) - => for homogenous edges, that's mostly agg operators on grouped edge pairs..
- for heterogeneous edges, like if multiple event types, less clear?
- Ex: If
-
Collapsing groups:
- Maybe easier as a separate discussion once we have the single-node case ironed out
--
So after writing that out, maybe for the case of removing one node at a time, it's just about compute new edge props ,(edge_attr_reducer ), and we don't need edge_reducer because it fine (so far) as a separate outside postprocessing call.
A super general contract would be taking functions like:
def my_edge_attr_reducer(removed_node : dict, new_src : dict, new_dst : dict, old_src_node_edge: dict, old_node_dst_edge: dict) ->List[dict]:
new_edge_props = dict(...)
return new_edge_props
g.replace_node_with_edges(edge_attr_reducer=my_edge_attr_reducer)
It is close to apply_rows, which is vectorizable
A similar vectorizable general contract can be panda's .agg() on each group g1=[(a)-(m1), (m1)-(b)], g2=... where the resulting cols go to (a)-[agg(g1)]-(b), .... <-- I'm leaning more to this...
--
Fun edge case: If there's a chain like (a)-(m1)-(m2)-(b) with multiple reduced nodes, unclear what my_edge_attr_reducer should do! Especially if we want this vectorizable!
I've been playing a variant for a project.
I'm thinking next step is:
-
adding a few toggles for if/which directions to merge + default to dropping self-loops, .... Ex: One user might want
n1->a->n2turns inton1->n2butn1->a<-n3to generate no edge, while another will want the former +n1->n3. -
Something around smarter overlapping property coalescing/reductions. There might be some lessons from the pandas
.merge()api, likesuffix,indicator,validate, andsort. This is basically a funny self-merge for edges, so those seem to generalize and have a reason for existing -
For bipartite graphs, like turning
"event -> user"into"event -> event", this seems fine-ish. However, for chains likea->remove_1->remove_2->b, this imp might be unstable: running to fixedpoint might not ever finish. A more guaranteed way is to remove allremove_1, then remove allremove_2, etc. However, that is asymptotically broken, while the current version tries to do a full 1-step elimination in bulk via a few joins... Unclear if smarter handling viable.
def collapse_nodes_by_id(g, nodes_df):
"""
- drop any nodes in g from nodes_df[g._source]
- drop corresponding edges
- ... and reroute as new edges:
- n1->a->n2 => n1->n2
- n1->a<-n2 => n1<-n2 (but skip self-edges)
- n1<-a->n2 => n1<-n2 (but skip self-edges)
"""
#remove g._nodes in nodes_df
nodes2 = g._nodes.set_index(g._node).drop(index=nodes_df[g._node]).reset_index()
####
#remove g._edges[g._source] in nodes_df
edges_indexed = g._edges[[g._source]].reset_index()
edges_labeled_src = edges_indexed.merge(nodes_df[[g._node]].rename(columns={g._node: g._source}).assign(drop=1), on=g._source, how='left').fillna({'drop': 0})
edges_bad_src_index = edges_labeled_src[ edges_labeled_src['drop'] == 1.0 ]['index']
edges_good_src = g._edges.drop(index=edges_bad_src_index)
#remove g._edges[g._destination] in nodes_df
edges_indexed = edges_good_src.reset_index()
edges_labeled_dst = edges_indexed.merge(nodes_df[[g._node]].rename(columns={g._node: g._destination}).assign(drop=1), on=g._destination, how='left').fillna({'drop': 0})
edges_bad_dst_index = edges_labeled_dst[ edges_labeled_dst['drop'] == 1.0 ]['index']
edges_good_dst = edges_good_src.drop(index=edges_bad_dst_index)
####
#for each removed node:
# n1->a->n2 => n1->n2
# n1->a<-n2 => n1<-n2 (but skip self-edges)
# n1<-a->n2 => n1<-n2 (but skip self-edges)
#focus just on rebuilding removed edges
edges_with_matching_src_tagged = g._edges.merge(nodes_df[[g._node]].rename(columns={g._node: g._source}).assign(drop=1), on=g._source, how='left').fillna({'drop': 0})
edges_with_matching_src = edges_with_matching_src_tagged[ edges_with_matching_src_tagged['drop'] == 1.0 ].drop(columns=['drop'])
edges_with_matching_dst_tagged = g._edges.merge(nodes_df[[g._node]].rename(columns={g._node: g._destination}).assign(drop=1), on=g._destination, how='left').fillna({'drop': 0})
edges_with_matching_dst = edges_with_matching_dst_tagged[ edges_with_matching_dst_tagged['drop'] == 1.0 ].drop(columns=['drop'])
edges_with_matching_src_dst = g._edges[ (edges_with_matching_src_tagged['drop'] == 1.0) | (edges_with_matching_dst_tagged['drop'] == 1.0) ]
# n1->a->n2 => n1->n2
directed_edges_raw = edges_with_matching_dst.merge(
edges_with_matching_src,
how='left',
left_on=g._destination,
right_on=g._source,
suffixes=('', '_y'))
directed_edges = (directed_edges_raw
.drop(columns=[g._destination, f'{g._source}_y'])
.rename(columns={f'{g._destination}_y': g._destination}))
#print('directed_edges', directed_edges)
# n1->a<-n2 => n1<-n2
# FIXME skip self-edges
inwards_edges_raw = edges_with_matching_dst.merge(
edges_with_matching_dst,
how='left',
on=g._destination,
suffixes=('', '_y'))
inwards_edges = (inwards_edges_raw
.drop(columns=[g._destination])
.rename(columns={f'{g._source}_y': g._destination}))
#print('inwards_edges', inwards_edges)
# n1<-a->n2 => n1<-n2
# FIXME skip self-edges
outwards_edges_raw = edges_with_matching_src.merge(
edges_with_matching_src,
how='left',
on=g._source,
suffixes=('', '_y'))
outwards_edges = (outwards_edges_raw
.drop(columns=[g._source])
.rename(columns={f'{g._destination}_y': g._source}))
#print('outwards_edges', outwards_edges)
new_edges = pd.concat([
edges_good_dst,
directed_edges,
inwards_edges,
outwards_edges], ignore_index=True)
return g.nodes(nodes2).edges(new_edges)
Ex:
collapse_nodes_by_id(
graphistry.nodes(pb_with_pb[:5], 'event')
.edges(pd.DataFrame({'s': [0, 0, 0, 1, 1, 2, 2, 3, 3, 4], 'd': [2, 1, 1, 1, 2, 2, 2, 3, 0, 5], 'v': ['aa', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']}), 's', 'd'),
pd.DataFrame({'event': [1, 3]}))._edges
Ex:
g2 = collapse_nodes_by_id(g, g._nodes[ g._nodes['category'] == 'event' ])
g2.plot()
Related, label prop WIP:
def propagate_edge_labels(g, incoming=None, outgoing=None, both=None, combine=None, fillna=None, groupby_deduplicate_key=None):
"""
incoming: optional .agg(**kwargs) for a node over its incoming edges
outgoing: optional .agg(**kwargs) for a node over its outgoing edges
both: reuse the same calc for both incoming + outgoing
combine: if setting both incoming+outgoing (or via both shorthand), how to then combine the results. Otherwise, will propagate the single aggregate provided.
fillna: nodes.fillna(...) if provided
groupby_deduplicate_key: likely to have incoming (outgoing) edges to have a key to deplicate on before reducing, such as 'event' in hypergraphs which would eitherwise be counted multiple times
"""
g = g.materialize_nodes()
if both is not None:
incoming = both
outgoing = both
dst_labels = None
if incoming is not None:
if groupby_deduplicate_key is not None:
dst_labels = g._edges.drop_duplicates([g._destination, groupby_deduplicate_key]).groupby(g._destination).agg(**incoming)
else:
dst_labels = g._edges.groupby(g._destination).agg(**incoming)
src_labels = None
if incoming is not None:
if groupby_deduplicate_key is not None:
src_labels = g._edges.drop_duplicates([g._source, groupby_deduplicate_key]).groupby(g._source).agg(**outgoing)
else:
src_labels = g._edges.groupby(g._source).agg(**outgoing)
nodes2 = None
if combine is not None:
nodes2 = g._nodes.set_index(g._node).assign(**{c: combine[c](dst_labels[c], src_labels[c]) for c in combine}).reset_index()
else:
nodes_d = g._nodes
if incoming is not None:
nodes_d = g._nodes.merge(dst_labels.reset_index().rename(columns={g._destination: g._node}), how='left', on=g._node)
nodes2 = nodes_d
if outgoing is not None:
nodes2 = nodes_d.merge(src_labels.reset_index().rename(columns={g._source: g._node}), how='left', on=g._node)
if fillna is not None:
nodes2 = nodes2.fillna(fillna)
return g.nodes(nodes2)
propagate_edge_labels(
graphistry.edges(
pd.DataFrame({
's': ['a', 'b', 'b', 'c', 'c', 'c'],
'd': ['a', 'b', 'c', 'a', 'b', 'c'],
'b': [True, True, False, False, False, False],
'v': [1, 2, 4, 6, 8, 10]
}),
's', 'd'),
both={
'sum_v': ('v', 'sum'),
'any_b': ('b', 'any'),
'count_b': ('b', 'sum'),
},
combine={
'sum_v': (lambda c1, c2: c1 + c2),
'any_b': (lambda c1, c2: c1 | c2),
'count_b': (lambda c1, c2: c1 + c2)
})._nodes
Node reductions came up again in the topology-aware .collapse: https://github.com/graphistry/pygraphistry/issues/336 + https://github.com/graphistry/pygraphistry/issues/337
The optimization thinking shows it may help to separate out primitives. A marking phase can identify collapse equiv classes in diff ways , while reduction phases can compute aggregates in structured ways.