pygraphistry icon indicating copy to clipboard operation
pygraphistry copied to clipboard

[FEA] Node reductions

Open lmeyerov opened this issue 4 years ago • 10 comments

Node reductions are great!

What

This is an important case of graph reductions. There are requests in both the UI and APIs to do reductions at the level of node, multiedge, and multiple nodes / subgraphs. Instead of simply filtering out the selected entities, the idea is they'd be partitioned into subgraphs, and different kinds of operations would replace them new topologies, and attributes representing them. They seem like some sort of continuous derivative operators, and may even be invertable.

Example

One of the most common cases is dropping a node and propagating its attributes and edges/attributes through its neighborhood, so

In hypergraph:

g = graphistry.hypergraph(
    pd.DataFrame([
       {'x': 'a', 'y': 'm'},
       {'x': 'b', 'y': 'm'}      
   ],
  direct=True)

We'd get 3-node graph (a)-[e1]->(m)<-[e2]-(b)

However, we really just want (a)-[m]-(b) ... so need a way to drop node (m) and synthesize ede -[m]->. This gets weirder in the cases of multiedges, cliques, and different kinds of nodes:

g2 = g.replace_node_with_edges(
  select_nodes=g._nodes.query('type="m"'),
  edge_reducer=g.reducers.pair_unique,
  edge_attr_reducer={'weight': 'max'})

g2 = g.replace_node_with_edges(select_nodes='m')
g2 = g.replace_node_with_edges(select_nodes=[1])
g2 = g.replace_node_with_edges(select_nodes=[False, True, False])
g2 = g.replace_node_with_edges(select_nodes=g._nodes.query('type="m"'))

g2 = g.replace_node_with_edges(select_nodes='m', edge_reducer=g.reducer.pair_unique_undirected, edge_reducer_attr={'weight': 'max', ...})

Sample reducers

This gets at patterns of graph reducers, where we want to take nodes/edges, remove them, and replace with other nodes edges. For example, we can replace all type='m' nodes with (a: x)-[e]->(b: x) via a node reduction driven by a selector predicate:

1. Node reducers: Drop a node by converting to edges

g2.replace_node_with_edges(selected=g._nodes['type'] == m, reducer=lambda edges: strong_component(edges_to_nodes(edges)))

The idea is pull out nodes matching some predicate

def replace_node_with_edges(g):
  nodes = g._nodes[ selected ]
  new_edges = []
  for node in g._nodes[g._node]:
    in_edges = g._edges[ g._edges[g._src] == node ]
    out_edges = g._edges[ g._edges[g._src] == node ]
    new_edges.append( reducer(pd.concat([in_edges, out_edges]))
  new_nodes = g._nodes[ ~selected ]
  g2 = g.nodes(new_nodes).edges(new_edges)
  return g2

where

def strong_component(edges, s='src', d='dst')
    node_ids = pd.concat([edges[s].unique() + edges[d].unique()]).unique()
   edges = []
    for x in range(len(node_ides)):
       for y in range(x+1, len(node_ids)):
           edges.append([x,y])
   return edges

2. Sugar for common cases

g2 = g.replace_node_with_edges(select_key='type', select_value='m', reducer=reducers.pair_unique)

3. Build into the hypergraph transform, such as during or post-process

g2 = graphistry.hypergraph(
    pd.DataFrame([
       {'x': 'a', 'y': 'm'},
       {'x': 'b', 'y': 'm'}      
   ],
  edges={ }, # no direct edges
  reducer_subgraphs=[ {'reducer_type': 'node_reducer', 'node_selector': ['y'], 'output_nodes': ['x'],  'reducer': 'pair_unique'} ]
)

Or as a post-process:

g2 = graphistry.hypergraph(
    pd.DataFrame([
       {'x': 'a', 'y': 'm'},
       {'x': 'b', 'y': 'm'}      
   ],
   direct=true,
   reduce=[ {'reducer_type': 'node_reducer', 'node_selector': ['y'], 'reducer': 'pair_unique'} ]
)

Plan

I think it's worth doing some principled experiments of different shaping use cases first (replace_node_with_edges()) as the API is not at all obvious:

  • selecting nodes: by index, type, ... ?
  • reducing a neighborhood to new edges: generic handler + typical cases
  • handling attribute reductions as part of that: generic handler + typical cases

Ideally we come to something simple + extensible in PyGraphistry, and use that to drive subsequent incorpration in hypergraphs, UI, and new preprocessing APIs.

Common scenarios

  • Turn hypergraph (event {...e} )->(attribute) into (attribute)-{...e}->(attribute)
  • Turn bipartite (a)->(b) graph into graph of (a)-[e {...b}]->(a) or (b)-[e {...a}]->(b)
    • Can combine with edge reductions to collapse multiedge (a1)-[e1]->(a2), (a1)-[e2]->(a2), ... into singular (a)-[e {...}]->(b)
    • If removed edges + nodes have props, should be available to generator of new edges
  • Collapse some hierarchy into summary nodes:
    • Rigid: Ex - collapse chain of country -> state -> city -> street into country -> state { street_count: 100 }
    • Loose: Ex - collapse communities like user { community: 1 } --[friend]--> user {community: 1} into community { id: 1, user_count: 2 }

lmeyerov avatar Jan 07 '21 21:01 lmeyerov

I am interested in working on this issue. I am currently reading through this and collecting some thoughts around it, do you mind assigning it to me?

pradkrish avatar Dec 02 '21 19:12 pradkrish

Awesome, this has been a long time coming!

I'm guessing the idea here is not super well-formed once you work through some examples. The ideas of collapsing nodes/edges and then replacing with simpler nodes/edges with summary stats is a common request, but I'm sure a lot of edge cases. So even the basic API definition seems up for grabs too.

My intuition is this will be used similarly to df.groupby(..).agg(...) in pandas, except needs to generalize to grouping nodes: removing them + their edges; reconnecting the new summary nodes to their old neighborhoods (accounting for replaced parts); and enriching with aggregate stats both on the new resulting nodes and edges. So probably a lot of APIs we can learn from + reuse!

lmeyerov avatar Dec 02 '21 22:12 lmeyerov

Oh, I forgot to mention: we're starting to do both CPU (pandas) + GPU (cudf/dask_cudf) flavors of implementations.

Totally fine to just start with CPU and I can help on GPU. To make the process easier, helps a lot to write as vectorized pandas. E.g., s = df['some_col'].sum() vs for x in df['some_col']: s += x. A lot of code ends fully reusable or just swapping a few pd for cudf calls.

lmeyerov avatar Dec 03 '21 20:12 lmeyerov

My ideas are still incomplete. I would like to pick your brains on what you think the output should look like for simple use cases. Discussing specific use cases can help build a bigger picture. If you take the following dataframe

g = graphistry.edges(pd.DataFrame([{'x': 'a', 'y': 'm'},
                                   {'x': 'b', 'y': 'm'},
                                   {'x': 'c', 'y': 'n'},
                                   {'x': 'd', 'y': 'm'}]), 'x', 'y')

Replacing all the nodes (m) with an edge [m] results in a clique(I guess) consisting of nodes (a), (b) and (d). Let's say we have a naive implementation of replace_node_with_edges('m'), which takes a node that need to be replaced with edges, like you described above. Let's treat the function as a blackbox for now and ignore the implementation details. Would you be able to illustrate how you imagine the output to look like for this function call?

g.replace_node_with_edges('m') 

pradkrish avatar Dec 04 '21 14:12 pradkrish

a->m<-b, c->n, d->m

Yes, I think you've got it: (a), (b), and (d) would now form a clique, so it'd be equiv to writing:

g2 = graphistry.edges(pd.DataFrame([
  # the clique
  {'x': 'a', 'y': 'b'},
  {'x': 'a', 'y': 'd'},
  {'x': 'b', 'y': 'd'}  # we will treat all edges as undirected, even though the UI rendering will be directed
  # the rest
  {'x': 'c', 'y': 'n'}],
  'x', 'y')

We may want to further include provenence information:

g2 = graphistry.edges(pd.DataFrame([
  # the clique
  {'x': 'a', 'y': 'b', 'reduced_from': 'm', 'type': m['type']}, # assumes original graphistry.nodes(df2) had col `type`
  {'x': 'a', 'y': 'd', 'reduced_from': 'm', 'type': m['type']},
  {'x': 'b', 'y': 'd', 'reduced_from': 'm', 'type': m['type']}
  # the rest
  {'x': 'c', 'y': 'n'}],
  'x', 'y')

Likewise, we can imagine updating node/edge based on aggregates like:

# node props
(a { f }) = f(a, [ (a->m), m for (a, m) in edges]) 

# edge props
(a)-[e { f }]->(b) = f(a, a->m, m, m->b, b)

My intuition is a mix of pandas/r's group/apply agg pattern for table reductions, and how tools like pregel/graphx do iterative maps on labeled <src_node, edge, dst_node> triples.

lmeyerov avatar Dec 06 '21 08:12 lmeyerov

I would like to go through the suggested function signature. Please correct me if you sense any misunderstanding on my part.

g2 = g.replace_node_with_edges(
  select_nodes=g._nodes.query('type="m"'),
  edge_reducer=g.reducers.pair_unique,
  edge_attr_reducer={'weight': 'max'})

select_nodes are the nodes which will become edges. select_nodes can be of type str, list(str), Series or a single-columned DataFrame.

edge_reducer probably determines the combination of the newly created edges. In the following example

df = pd.DataFrame({'x': ['a', 'b'], 'y': ['m', 'm'], 'w':[30,15})
g = graphistry.edges(df, 'x', 'y').nodes(df, 'y').bind(edge_weight='w')

for select_nodes=['m'], g.reducers.pair_unique results in edges (a,b) and (b,a) but g.reducers.pair_unique_undirected should only give edge (a,b)

edge_attr_reducer sets the attributes of the newly created edges. In the following example

df = pd.DataFrame({'x': ['a', 'b', 'c','d','e'], 'y': ['m', 'm', 'n', 'm','o'], 'w':[30,15,40,20,50]})
g = graphistry.edges(df, 'x', 'y').nodes(df, 'y').bind(edge_weight='w')

for select_nodes=['m','n'], edge_attr_reducer={'weight': 'max'} will set the edge_weight of the newly created edges alone to 40. weight can probably take other reducers as well, like min, sum, mean etc.

Clarifying it here will help me with the PR, thanks :)

pradkrish avatar Dec 12 '21 19:12 pradkrish

select_node seems close! For select_nodes =g._nodes.query(...) to "just work", we'd also take a df where we slice on g._node

--

For reducers, I'm not sure of the right signatures, so feel free to propose :) If we can keep these minimal and push additional reductions to complementary operators, that'd make this simpler.

Working backwards from some sample use cases:

  • Hypergraph example: (a:attrib)<-[]-(m:event { props })-[]->(b:attrib) => (a:attrib)-[m:event{props}]->(b:attrib)

    • want to propagate the m:event node atttributes to the new m:event edge
    • Out of scope: playing with multiedge vs single edge on the result is interesting, but we can defer to a complementary Edge reduction call for now to keep things simple
    • Out of scope: likewise, we might want to record a removed event count for each attrib... but that can be down w/ a subsequent neighborhood mapper that counts later
  • Bipartite -> single type:

    • Ex: If (u1:user)<-[e1:event {time1}]-(ip1:IP)-[e2:event {time2}]->(u2:user)=> (u1)-[ip1 { minTime, maxTime}]-(u2)
    • So for every (a), (m), (b), we want new edge with properties (e { ...f( edge a-m, edge m-b) })
    • => for homogenous edges, that's mostly agg operators on grouped edge pairs..
    • for heterogeneous edges, like if multiple event types, less clear?
  • Collapsing groups:

    • Maybe easier as a separate discussion once we have the single-node case ironed out

--

So after writing that out, maybe for the case of removing one node at a time, it's just about compute new edge props ,(edge_attr_reducer ), and we don't need edge_reducer because it fine (so far) as a separate outside postprocessing call.

A super general contract would be taking functions like:

def my_edge_attr_reducer(removed_node : dict, new_src : dict, new_dst : dict, old_src_node_edge: dict, old_node_dst_edge: dict) ->List[dict]:
    new_edge_props = dict(...)
    return  new_edge_props

g.replace_node_with_edges(edge_attr_reducer=my_edge_attr_reducer)

It is close to apply_rows, which is vectorizable

A similar vectorizable general contract can be panda's .agg() on each group g1=[(a)-(m1), (m1)-(b)], g2=... where the resulting cols go to (a)-[agg(g1)]-(b), .... <-- I'm leaning more to this...

--

Fun edge case: If there's a chain like (a)-(m1)-(m2)-(b) with multiple reduced nodes, unclear what my_edge_attr_reducer should do! Especially if we want this vectorizable!

lmeyerov avatar Dec 13 '21 09:12 lmeyerov

I've been playing a variant for a project.

I'm thinking next step is:

  • adding a few toggles for if/which directions to merge + default to dropping self-loops, .... Ex: One user might want n1->a->n2 turns into n1->n2 but n1->a<-n3 to generate no edge, while another will want the former + n1->n3.

  • Something around smarter overlapping property coalescing/reductions. There might be some lessons from the pandas .merge() api, like suffix, indicator, validate, and sort. This is basically a funny self-merge for edges, so those seem to generalize and have a reason for existing

  • For bipartite graphs, like turning "event -> user" into "event -> event", this seems fine-ish. However, for chains like a->remove_1->remove_2->b, this imp might be unstable: running to fixedpoint might not ever finish. A more guaranteed way is to remove all remove_1, then remove all remove_2, etc. However, that is asymptotically broken, while the current version tries to do a full 1-step elimination in bulk via a few joins... Unclear if smarter handling viable.

def collapse_nodes_by_id(g, nodes_df):
    """
        - drop any nodes in g from nodes_df[g._source]
        - drop corresponding edges
        - ... and reroute as new edges:
          - n1->a->n2 => n1->n2
          - n1->a<-n2 => n1<-n2 (but skip self-edges)
          - n1<-a->n2 => n1<-n2 (but skip self-edges)
    """

    #remove g._nodes in nodes_df
    nodes2 = g._nodes.set_index(g._node).drop(index=nodes_df[g._node]).reset_index()

    ####

    #remove g._edges[g._source] in nodes_df
    edges_indexed = g._edges[[g._source]].reset_index()
    edges_labeled_src = edges_indexed.merge(nodes_df[[g._node]].rename(columns={g._node: g._source}).assign(drop=1), on=g._source, how='left').fillna({'drop': 0})
    edges_bad_src_index = edges_labeled_src[ edges_labeled_src['drop'] == 1.0 ]['index']
    edges_good_src = g._edges.drop(index=edges_bad_src_index)

    #remove g._edges[g._destination] in nodes_df
    edges_indexed = edges_good_src.reset_index()
    edges_labeled_dst = edges_indexed.merge(nodes_df[[g._node]].rename(columns={g._node: g._destination}).assign(drop=1), on=g._destination, how='left').fillna({'drop': 0})
    edges_bad_dst_index = edges_labeled_dst[ edges_labeled_dst['drop'] == 1.0 ]['index']
    edges_good_dst = edges_good_src.drop(index=edges_bad_dst_index)

    ####

    #for each removed node:
    #  n1->a->n2 => n1->n2
    #  n1->a<-n2 => n1<-n2 (but skip self-edges)
    #  n1<-a->n2 => n1<-n2 (but skip self-edges)

    #focus just on rebuilding removed edges
    edges_with_matching_src_tagged = g._edges.merge(nodes_df[[g._node]].rename(columns={g._node: g._source}).assign(drop=1), on=g._source, how='left').fillna({'drop': 0})
    edges_with_matching_src = edges_with_matching_src_tagged[ edges_with_matching_src_tagged['drop'] == 1.0 ].drop(columns=['drop'])
    edges_with_matching_dst_tagged = g._edges.merge(nodes_df[[g._node]].rename(columns={g._node: g._destination}).assign(drop=1), on=g._destination, how='left').fillna({'drop': 0})
    edges_with_matching_dst = edges_with_matching_dst_tagged[ edges_with_matching_dst_tagged['drop'] == 1.0 ].drop(columns=['drop'])
    edges_with_matching_src_dst = g._edges[ (edges_with_matching_src_tagged['drop'] == 1.0) | (edges_with_matching_dst_tagged['drop'] == 1.0) ]

    #  n1->a->n2 => n1->n2
    directed_edges_raw = edges_with_matching_dst.merge(
        edges_with_matching_src,
        how='left',
        left_on=g._destination,
        right_on=g._source,
        suffixes=('', '_y'))
    directed_edges = (directed_edges_raw
      .drop(columns=[g._destination, f'{g._source}_y'])
      .rename(columns={f'{g._destination}_y': g._destination}))
    #print('directed_edges', directed_edges)

    #  n1->a<-n2 => n1<-n2
    # FIXME skip self-edges
    inwards_edges_raw = edges_with_matching_dst.merge(
        edges_with_matching_dst,
        how='left',
        on=g._destination,
        suffixes=('', '_y'))
    inwards_edges = (inwards_edges_raw
        .drop(columns=[g._destination])
        .rename(columns={f'{g._source}_y': g._destination}))
    #print('inwards_edges', inwards_edges)

    #  n1<-a->n2 => n1<-n2
    # FIXME skip self-edges
    outwards_edges_raw = edges_with_matching_src.merge(
        edges_with_matching_src,
        how='left',
        on=g._source,
        suffixes=('', '_y'))
    outwards_edges = (outwards_edges_raw
        .drop(columns=[g._source])
        .rename(columns={f'{g._destination}_y': g._source}))
    #print('outwards_edges', outwards_edges)

    new_edges = pd.concat([
        edges_good_dst,
        directed_edges,
        inwards_edges,
        outwards_edges], ignore_index=True)

    return g.nodes(nodes2).edges(new_edges)

Ex:

collapse_nodes_by_id(
    graphistry.nodes(pb_with_pb[:5], 'event')
        .edges(pd.DataFrame({'s': [0, 0, 0, 1, 1, 2, 2, 3, 3, 4], 'd': [2, 1, 1, 1, 2, 2, 2, 3, 0, 5], 'v': ['aa', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']}), 's', 'd'),
    pd.DataFrame({'event': [1, 3]}))._edges

Ex:

g2 = collapse_nodes_by_id(g, g._nodes[ g._nodes['category'] == 'event' ])
g2.plot()

lmeyerov avatar Feb 03 '22 08:02 lmeyerov

Related, label prop WIP:

def propagate_edge_labels(g, incoming=None, outgoing=None, both=None, combine=None, fillna=None, groupby_deduplicate_key=None):
    """
        incoming: optional .agg(**kwargs) for a node over its incoming edges
        outgoing: optional .agg(**kwargs) for a node over its outgoing edges
        both: reuse the same calc for both incoming + outgoing
        combine: if setting both incoming+outgoing (or via both shorthand), how to then combine the results. Otherwise, will propagate the single aggregate provided.
        fillna: nodes.fillna(...) if provided
        groupby_deduplicate_key: likely to have incoming (outgoing) edges to have a key to deplicate on before reducing, such as 'event' in hypergraphs which would eitherwise be counted multiple times
    """

    g = g.materialize_nodes()

    if both is not None:
        incoming = both
        outgoing = both

    dst_labels = None
    if incoming is not None:
        if groupby_deduplicate_key is not None:
            dst_labels = g._edges.drop_duplicates([g._destination, groupby_deduplicate_key]).groupby(g._destination).agg(**incoming)
        else:
            dst_labels = g._edges.groupby(g._destination).agg(**incoming)

    src_labels = None
    if incoming is not None:
        if groupby_deduplicate_key is not None:
            src_labels = g._edges.drop_duplicates([g._source, groupby_deduplicate_key]).groupby(g._source).agg(**outgoing)
        else:
            src_labels = g._edges.groupby(g._source).agg(**outgoing)

    nodes2 = None
    if combine is not None:
        nodes2 = g._nodes.set_index(g._node).assign(**{c: combine[c](dst_labels[c], src_labels[c]) for c in combine}).reset_index()
    else:
        nodes_d = g._nodes
        if incoming is not None:
            nodes_d = g._nodes.merge(dst_labels.reset_index().rename(columns={g._destination: g._node}), how='left', on=g._node)
        nodes2 = nodes_d
        if outgoing is not None:
            nodes2 = nodes_d.merge(src_labels.reset_index().rename(columns={g._source: g._node}), how='left', on=g._node)

    if fillna is not None:
        nodes2 = nodes2.fillna(fillna)

    return g.nodes(nodes2)
propagate_edge_labels(
    graphistry.edges(
        pd.DataFrame({
            's': ['a', 'b', 'b', 'c', 'c', 'c'],
            'd': ['a', 'b', 'c', 'a', 'b', 'c'],
            'b': [True, True, False, False, False, False],
            'v': [1, 2, 4, 6, 8, 10]
        }),
        's', 'd'),
    both={
        'sum_v': ('v', 'sum'),
        'any_b': ('b', 'any'),
        'count_b': ('b', 'sum'),
    },
    combine={
        'sum_v': (lambda c1, c2: c1 + c2),
        'any_b': (lambda c1, c2: c1 | c2),
        'count_b': (lambda c1, c2: c1 + c2)
    })._nodes

lmeyerov avatar Feb 03 '22 11:02 lmeyerov

Node reductions came up again in the topology-aware .collapse: https://github.com/graphistry/pygraphistry/issues/336 + https://github.com/graphistry/pygraphistry/issues/337

The optimization thinking shows it may help to separate out primitives. A marking phase can identify collapse equiv classes in diff ways , while reduction phases can compute aggregates in structured ways.

lmeyerov avatar Apr 23 '22 15:04 lmeyerov