graphstorm icon indicating copy to clipboard operation
graphstorm copied to clipboard

Potential issue with edge filtering during GConstruct

Open thvasilo opened this issue 6 months ago • 1 comments

Ran into this error during CI:

https://github.com/awslabs/graphstorm/actions/runs/15907555054/job/44866847860?pr=1288

[2025-06-26T16:53:30.453Z] =================================== FAILURES ===================================
[2025-06-26T16:53:30.453Z] _____________________________ test_parse_edge_data _____________________________
[2025-06-26T16:53:30.453Z]     def test_parse_edge_data():
[2025-06-26T16:53:30.453Z]         np.random.seed(1)
[2025-06-26T16:53:30.453Z]         with tempfile.TemporaryDirectory() as tmpdirname:
[2025-06-26T16:53:30.453Z]             str_src_ids = np.array([str(i) for i in range(10)])
[2025-06-26T16:53:30.453Z]             str_dst_ids = np.array([str(i) for i in range(15)])
[2025-06-26T16:53:30.453Z]             node_id_map = {"src": IdMap(str_src_ids),
[2025-06-26T16:53:30.453Z]                            "dst": IdMap(str_dst_ids)}
[2025-06-26T16:53:30.453Z]     
[2025-06-26T16:53:30.453Z]             src_ids = np.array([str(random.randint(0, 20)) for _ in range(15)])
[2025-06-26T16:53:30.453Z]             dst_ids = np.array([str(random.randint(0, 25)) for _ in range(15)])
[2025-06-26T16:53:30.453Z]             feat = np.random.rand(15, 10)
[2025-06-26T16:53:30.453Z]             data = {
[2025-06-26T16:53:30.453Z]                 "src_id": src_ids,
[2025-06-26T16:53:30.453Z]                 "dst_id": dst_ids,
[2025-06-26T16:53:30.453Z]                 "feat": feat,
[2025-06-26T16:53:30.453Z]             }
[2025-06-26T16:53:30.453Z]     
[2025-06-26T16:53:30.453Z]             feat_ops = [Noop("feat", "feat", None)]
[2025-06-26T16:53:30.453Z]             label_ops = [
[2025-06-26T16:53:30.453Z]                 LinkPredictionProcessor(None, None, [0.7,0.1,0.2], None)]
[2025-06-26T16:53:30.453Z]             data_file = os.path.join(tmpdirname, "data.parquet")
[2025-06-26T16:53:30.453Z]             write_data_parquet(data, data_file)
[2025-06-26T16:53:30.453Z]     
[2025-06-26T16:53:30.453Z]             conf = {
[2025-06-26T16:53:30.453Z]                 "source_id_col": "src_id",
[2025-06-26T16:53:30.453Z]                 "dest_id_col": "dst_id",
[2025-06-26T16:53:30.453Z]                 "relation": ("src", "rel", "dst"),
[2025-06-26T16:53:30.453Z]                 "features": [
[2025-06-26T16:53:30.453Z]                     {"feature_col": "feat", "feature_name": "feat", "feature_dim": [10]}
[2025-06-26T16:53:30.453Z]                 ]
[2025-06-26T16:53:30.453Z]             }
[2025-06-26T16:53:30.453Z]             keys = ["src_id", "dst_id", "feat"]
[2025-06-26T16:53:30.453Z]             src_ids, dst_ids, feat_data, _ = \
[2025-06-26T16:53:30.453Z] >               parse_edge_data(data_file, feat_ops, label_ops, node_id_map,
[2025-06-26T16:53:30.453Z]                                 partial(read_data_parquet, data_fields=keys),
[2025-06-26T16:53:30.453Z]                                 conf, skip_nonexist_edges=True)
[2025-06-26T16:53:30.453Z] tests/unit-tests/gconstruct/test_construct_graph.py:2031: 
[2025-06-26T16:53:30.453Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2025-06-26T16:53:30.453Z] python/graphstorm/gconstruct/construct_graph.py:211: in parse_edge_data
[2025-06-26T16:53:30.453Z]     feat_data = {key: feat[dst_exist_locs] \
[2025-06-26T16:53:30.453Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2025-06-26T16:53:30.453Z] .0 = <dict_itemiterator object at 0x7f04fd70aca0>
[2025-06-26T16:53:30.453Z] >   feat_data = {key: feat[dst_exist_locs] \
[2025-06-26T16:53:30.453Z]                  for key, feat in feat_data.items()}
[2025-06-26T16:53:30.453Z] E   IndexError: arrays used as indices must be of integer (or boolean) type
[2025-06-26T16:53:30.453Z] python/graphstorm/gconstruct/construct_graph.py:211: IndexError
[2025-06-26T16:53:30.453Z] ------------------------------ Captured log call -------------------------------
[2025-06-26T16:53:30.453Z] WARNING  root:id_map.py:299 source nodes of src do not exist. Skip 10 edges
[2025-06-26T16:53:30.453Z] WARNING  root:id_map.py:314 dest nodes of dst do not exist. Skip 5 edges
[2025-06-26T16:53:30.453Z] =============================== warnings summary ===============================
[2025-06-26T16:53:30.453Z] ../usr/local/lib/python3.10/dist-packages/torchdata/datapipes/__init__.py:18
[2025-06-26T16:53:30.453Z]   /usr/local/lib/python3.10/dist-packages/torchdata/datapipes/__init__.py:18: UserWarning: 
[2025-06-26T16:53:30.453Z]   ################################################################################
[2025-06-26T16:53:30.453Z]   WARNING!
[2025-06-26T16:53:30.453Z]   The 'datapipes', 'dataloader2' modules are deprecated and will be removed in a
[2025-06-26T16:53:30.453Z]   future torchdata release! Please see https://github.com/pytorch/data/issues/1196
[2025-06-26T16:53:30.453Z]   to learn more and leave feedback.
[2025-06-26T16:53:30.453Z]   ################################################################################
[2025-06-26T16:53:30.453Z]   
[2025-06-26T16:53:30.453Z]     deprecation_warning()
[2025-06-26T16:53:30.453Z] tests/unit-tests/test_gsf.py:681
[2025-06-26T16:53:30.453Z]   /graphstorm/tests/unit-tests/test_gsf.py:681: DeprecationWarning: invalid escape sequence '\g'
[2025-06-26T16:53:30.453Z]     graph_name = "\graph_name"
[2025-06-26T16:53:30.453Z] tests/unit-tests/gconstruct/test_construct_graph.py::test_process_features_fp16
[2025-06-26T16:53:30.453Z]   /graphstorm/python/graphstorm/gconstruct/utils.py:633: UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at ../torch/csrc/utils/tensor_new.cpp:274.)
[2025-06-26T16:53:30.453Z]     arr = th.tensor(self._arr)
[2025-06-26T16:53:30.453Z] tests/unit-tests/gconstruct/test_construct_graph.py::test_label
[2025-06-26T16:53:30.453Z]   /graphstorm/python/graphstorm/gconstruct/transform.py:1932: RuntimeWarning: invalid value encountered in cast
[2025-06-26T16:53:30.453Z]     res[self.label_name] = np.int32(label)
[2025-06-26T16:53:30.453Z] -- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
[2025-06-26T16:53:30.453Z] =========================== short test summary info ============================
[2025-06-26T16:53:30.453Z] FAILED tests/unit-tests/gconstruct/test_construct_graph.py::test_parse_edge_data
[2025-06-26T16:53:30.453Z] !!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!
[2025-06-26T16:53:30.453Z] =================== 1 failed, 24 passed, 4 warnings in 9.92s ===================
[2025-06-26T16:53:31.344Z] Exit code: 1

Potential cause (from GenAI analysis)

The problem is in how the feature data is being indexed in parse_edge_data after filtering out non-existent edges.

In map_node_ids, when skip_nonexist_edges=True, it returns src_exist_locs and dst_exist_locs to indicate which edges should be kept. However, in parse_edge_data, the code is trying to apply these filters sequentially:

if src_exist_locs is not None:
    feat_data = {key: feat[src_exist_locs] \
                 for key, feat in feat_data.items()}
if dst_exist_locs is not None:
    feat_data = {key: feat[dst_exist_locs] \
                 for key, feat in feat_data.items()}

The issue is that after applying src_exist_locs, the feature data is already filtered, but then it tries to apply dst_exist_locs which still contains indices based on the original unfiltered data. This causes the indexing error because dst_exist_locs contains indices that are now out of bounds.

thvasilo avatar Jun 26 '25 17:06 thvasilo

Potential fix from Claude:

The key is to combine both location masks before filtering the features, rather than applying them sequentially. Here's the proposed fix for the parse_edge_data function:

if src_ids is not None:
    src_ids, dst_ids, src_exist_locs, dst_exist_locs = \
        map_node_ids(src_ids, dst_ids, edge_type, node_id_map,
                                    skip_nonexist_edges)
    # Create a combined mask for filtering features
    if src_exist_locs is not None or dst_exist_locs is not None:
        # Start with all True if no src filtering needed
        combined_mask = np.ones(len(feat_data[next(iter(feat_data))]), dtype=bool) \
            if src_exist_locs is None else np.zeros(len(feat_data[next(iter(feat_data))]), dtype=bool)
        
        # Apply source filter if it exists
        if src_exist_locs is not None:
            combined_mask[src_exist_locs] = True
            
        # Apply destination filter if it exists
        if dst_exist_locs is not None:
            # Only keep locations that passed the src filter (if any) and are in dst_exist_locs
            combined_mask = combined_mask & np.isin(np.arange(len(combined_mask)), dst_exist_locs)
        
        # Apply the combined mask to all features at once
        feat_data = {key: feat[combined_mask] for key, feat in feat_data.items()}
        
        # Verify lengths match
        for key, feat in feat_data.items():
            assert len(src_ids) == len(feat), \
                f"Expecting the edge feature {key} has the same length " \
                f"as num existing edges {len(src_ids)}, but get {len(feat)}"

The key changes are:

  1. Create a combined boolean mask instead of filtering sequentially
  2. If src_exist_locs exists, start with all False and set True for those locations
  3. If dst_exist_locs exists, use numpy's isin to find indices that are in dst_exist_locs
  4. Apply the combined mask once to all features

This approach avoids the issue of applying dst_exist_locs to already filtered data, and ensures that both filters are applied correctly regardless of order.

thvasilo avatar Jun 26 '25 21:06 thvasilo