gnn icon indicating copy to clipboard operation
gnn copied to clipboard

Problem of graph_sampler when using latent node

Open mhsong21 opened this issue 1 year ago • 0 comments

Hello, I found a problem when running tfgnn_graph_sampler with graph having latent node (only id without any feature).

The error message was AttributeError: 'list' object has no attribute 'get' when accessing edges here https://github.com/tensorflow/gnn/blob/main/tensorflow_gnn/sampler/subgraph.py#L330

Code snippet

def _create_empty_node_features(
    schema: tfgnn.GraphSchema, node_set_name: tfgnn.NodeSetName,
    edges: Mapping[tfgnn.EdgeSetName, Iterable[Node]]
) -> List[Tuple[NodeId, Features]]:
  """Create empty features for all latent nodes referenced by edges."""
  unique_node_ids = set()
  for edge_set_name, edge_set in schema.edge_sets.items():
    edges = edges.get(edge_set_name, None) # <-- here
    if not edges:
      continue
    if edge_set.source == node_set_name:
      for node in edges:
        unique_node_ids.add(node.id)
    if edge_set.target == node_set_name:
      for node in edges:
        unique_node_ids.update(
            [edge.neighbor_id for edge in node.outgoing_edges])
  dummy_example = Features()
  return [(node_id, dummy_example) for node_id in unique_node_ids]

This is because edges are replaced by Iterable[Node] after the first loop. When the graph has latent node and multiple edge sets, the problem should be reproducible. I suggest changing the variable's name so the argument edges remain the same. In my case, the error was fixed after changing the code and dataflow job succeeded.

Like this:

def _create_empty_node_features(
    schema: tfgnn.GraphSchema, node_set_name: tfgnn.NodeSetName,
    edges: Mapping[tfgnn.EdgeSetName, Iterable[Node]]
) -> List[Tuple[NodeId, Features]]:
  """Create empty features for all latent nodes referenced by edges."""
  unique_node_ids = set()
  for edge_set_name, edge_set in schema.edge_sets.items():
    edges_per_set = edges.get(edge_set_name, None)
    if not edges_per_set:
      continue
    if edge_set.source == node_set_name:
      for node in edges_per_set:
        unique_node_ids.add(node.id)
    if edge_set.target == node_set_name:
      for node in edges_per_set:
        unique_node_ids.update(
            [edge.neighbor_id for edge in node.outgoing_edges])
  dummy_example = Features()
  return [(node_id, dummy_example) for node_id in unique_node_ids]

Full error log

Traceback (most recent call last):
  File "/opt/venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 297, in _execute
    response = task()
  File "/opt/venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 372, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/opt/venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
    return getattr(self, request_type)(
  File "/opt/venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/opt/venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1051, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/opt/venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 572, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1436, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1524, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1434, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1619, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1732, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1436, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1524, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1434, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1619, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1732, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1436, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1545, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1434, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/opt/venv/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 2040, in <lambda>
    wrapper = lambda x: [fn(*x)]
  File "/opt/venv/lib/python3.9/site-packages/tensorflow_gnn/sampler/graph_sampler.py", line 240, in convert_to_tf_example
    return subgraph.encode_subgraph_pieces_to_example(
  File "/opt/venv/lib/python3.9/site-packages/tensorflow_gnn/sampler/subgraph.py", line 183, in encode_subgraph_pieces_to_example
    nodes = _create_empty_node_features(schema, node_set_name, edge_sets)
  File "/opt/venv/lib/python3.9/site-packages/tensorflow_gnn/sampler/subgraph.py", line 330, in _create_empty_node_features
    edges = edges.get(edge_set_name, None)
AttributeError: 'list' object has no attribute 'get' [while running 'CreateGraphTensors/ConvertToTfExample-ptransform-51']

mhsong21 avatar Nov 10 '23 22:11 mhsong21