Potential issue with edge filtering during GConstruct
Ran into this error during CI:
https://github.com/awslabs/graphstorm/actions/runs/15907555054/job/44866847860?pr=1288
[2025-06-26T16:53:30.453Z] =================================== FAILURES ===================================
[2025-06-26T16:53:30.453Z] _____________________________ test_parse_edge_data _____________________________
[2025-06-26T16:53:30.453Z] def test_parse_edge_data():
[2025-06-26T16:53:30.453Z] np.random.seed(1)
[2025-06-26T16:53:30.453Z] with tempfile.TemporaryDirectory() as tmpdirname:
[2025-06-26T16:53:30.453Z] str_src_ids = np.array([str(i) for i in range(10)])
[2025-06-26T16:53:30.453Z] str_dst_ids = np.array([str(i) for i in range(15)])
[2025-06-26T16:53:30.453Z] node_id_map = {"src": IdMap(str_src_ids),
[2025-06-26T16:53:30.453Z] "dst": IdMap(str_dst_ids)}
[2025-06-26T16:53:30.453Z]
[2025-06-26T16:53:30.453Z] src_ids = np.array([str(random.randint(0, 20)) for _ in range(15)])
[2025-06-26T16:53:30.453Z] dst_ids = np.array([str(random.randint(0, 25)) for _ in range(15)])
[2025-06-26T16:53:30.453Z] feat = np.random.rand(15, 10)
[2025-06-26T16:53:30.453Z] data = {
[2025-06-26T16:53:30.453Z] "src_id": src_ids,
[2025-06-26T16:53:30.453Z] "dst_id": dst_ids,
[2025-06-26T16:53:30.453Z] "feat": feat,
[2025-06-26T16:53:30.453Z] }
[2025-06-26T16:53:30.453Z]
[2025-06-26T16:53:30.453Z] feat_ops = [Noop("feat", "feat", None)]
[2025-06-26T16:53:30.453Z] label_ops = [
[2025-06-26T16:53:30.453Z] LinkPredictionProcessor(None, None, [0.7,0.1,0.2], None)]
[2025-06-26T16:53:30.453Z] data_file = os.path.join(tmpdirname, "data.parquet")
[2025-06-26T16:53:30.453Z] write_data_parquet(data, data_file)
[2025-06-26T16:53:30.453Z]
[2025-06-26T16:53:30.453Z] conf = {
[2025-06-26T16:53:30.453Z] "source_id_col": "src_id",
[2025-06-26T16:53:30.453Z] "dest_id_col": "dst_id",
[2025-06-26T16:53:30.453Z] "relation": ("src", "rel", "dst"),
[2025-06-26T16:53:30.453Z] "features": [
[2025-06-26T16:53:30.453Z] {"feature_col": "feat", "feature_name": "feat", "feature_dim": [10]}
[2025-06-26T16:53:30.453Z] ]
[2025-06-26T16:53:30.453Z] }
[2025-06-26T16:53:30.453Z] keys = ["src_id", "dst_id", "feat"]
[2025-06-26T16:53:30.453Z] src_ids, dst_ids, feat_data, _ = \
[2025-06-26T16:53:30.453Z] > parse_edge_data(data_file, feat_ops, label_ops, node_id_map,
[2025-06-26T16:53:30.453Z] partial(read_data_parquet, data_fields=keys),
[2025-06-26T16:53:30.453Z] conf, skip_nonexist_edges=True)
[2025-06-26T16:53:30.453Z] tests/unit-tests/gconstruct/test_construct_graph.py:2031:
[2025-06-26T16:53:30.453Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[2025-06-26T16:53:30.453Z] python/graphstorm/gconstruct/construct_graph.py:211: in parse_edge_data
[2025-06-26T16:53:30.453Z] feat_data = {key: feat[dst_exist_locs] \
[2025-06-26T16:53:30.453Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[2025-06-26T16:53:30.453Z] .0 = <dict_itemiterator object at 0x7f04fd70aca0>
[2025-06-26T16:53:30.453Z] > feat_data = {key: feat[dst_exist_locs] \
[2025-06-26T16:53:30.453Z] for key, feat in feat_data.items()}
[2025-06-26T16:53:30.453Z] E IndexError: arrays used as indices must be of integer (or boolean) type
[2025-06-26T16:53:30.453Z] python/graphstorm/gconstruct/construct_graph.py:211: IndexError
[2025-06-26T16:53:30.453Z] ------------------------------ Captured log call -------------------------------
[2025-06-26T16:53:30.453Z] WARNING root:id_map.py:299 source nodes of src do not exist. Skip 10 edges
[2025-06-26T16:53:30.453Z] WARNING root:id_map.py:314 dest nodes of dst do not exist. Skip 5 edges
[2025-06-26T16:53:30.453Z] =============================== warnings summary ===============================
[2025-06-26T16:53:30.453Z] ../usr/local/lib/python3.10/dist-packages/torchdata/datapipes/__init__.py:18
[2025-06-26T16:53:30.453Z] /usr/local/lib/python3.10/dist-packages/torchdata/datapipes/__init__.py:18: UserWarning:
[2025-06-26T16:53:30.453Z] ################################################################################
[2025-06-26T16:53:30.453Z] WARNING!
[2025-06-26T16:53:30.453Z] The 'datapipes', 'dataloader2' modules are deprecated and will be removed in a
[2025-06-26T16:53:30.453Z] future torchdata release! Please see https://github.com/pytorch/data/issues/1196
[2025-06-26T16:53:30.453Z] to learn more and leave feedback.
[2025-06-26T16:53:30.453Z] ################################################################################
[2025-06-26T16:53:30.453Z]
[2025-06-26T16:53:30.453Z] deprecation_warning()
[2025-06-26T16:53:30.453Z] tests/unit-tests/test_gsf.py:681
[2025-06-26T16:53:30.453Z] /graphstorm/tests/unit-tests/test_gsf.py:681: DeprecationWarning: invalid escape sequence '\g'
[2025-06-26T16:53:30.453Z] graph_name = "\graph_name"
[2025-06-26T16:53:30.453Z] tests/unit-tests/gconstruct/test_construct_graph.py::test_process_features_fp16
[2025-06-26T16:53:30.453Z] /graphstorm/python/graphstorm/gconstruct/utils.py:633: UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at ../torch/csrc/utils/tensor_new.cpp:274.)
[2025-06-26T16:53:30.453Z] arr = th.tensor(self._arr)
[2025-06-26T16:53:30.453Z] tests/unit-tests/gconstruct/test_construct_graph.py::test_label
[2025-06-26T16:53:30.453Z] /graphstorm/python/graphstorm/gconstruct/transform.py:1932: RuntimeWarning: invalid value encountered in cast
[2025-06-26T16:53:30.453Z] res[self.label_name] = np.int32(label)
[2025-06-26T16:53:30.453Z] -- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
[2025-06-26T16:53:30.453Z] =========================== short test summary info ============================
[2025-06-26T16:53:30.453Z] FAILED tests/unit-tests/gconstruct/test_construct_graph.py::test_parse_edge_data
[2025-06-26T16:53:30.453Z] !!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!
[2025-06-26T16:53:30.453Z] =================== 1 failed, 24 passed, 4 warnings in 9.92s ===================
[2025-06-26T16:53:31.344Z] Exit code: 1
Potential cause (from GenAI analysis)
The problem is in how the feature data is being indexed in parse_edge_data after filtering out non-existent edges.
In map_node_ids, when skip_nonexist_edges=True, it returns src_exist_locs and dst_exist_locs to indicate which edges should be kept. However, in parse_edge_data, the code is trying to apply these filters sequentially:
if src_exist_locs is not None:
feat_data = {key: feat[src_exist_locs] \
for key, feat in feat_data.items()}
if dst_exist_locs is not None:
feat_data = {key: feat[dst_exist_locs] \
for key, feat in feat_data.items()}
The issue is that after applying src_exist_locs, the feature data is already filtered, but then it tries to apply dst_exist_locs which still contains indices based on the original unfiltered data. This causes the indexing error because dst_exist_locs contains indices that are now out of bounds.
Potential fix from Claude:
The key is to combine both location masks before filtering the features, rather than applying them sequentially. Here's the proposed fix for the parse_edge_data function:
if src_ids is not None:
src_ids, dst_ids, src_exist_locs, dst_exist_locs = \
map_node_ids(src_ids, dst_ids, edge_type, node_id_map,
skip_nonexist_edges)
# Create a combined mask for filtering features
if src_exist_locs is not None or dst_exist_locs is not None:
# Start with all True if no src filtering needed
combined_mask = np.ones(len(feat_data[next(iter(feat_data))]), dtype=bool) \
if src_exist_locs is None else np.zeros(len(feat_data[next(iter(feat_data))]), dtype=bool)
# Apply source filter if it exists
if src_exist_locs is not None:
combined_mask[src_exist_locs] = True
# Apply destination filter if it exists
if dst_exist_locs is not None:
# Only keep locations that passed the src filter (if any) and are in dst_exist_locs
combined_mask = combined_mask & np.isin(np.arange(len(combined_mask)), dst_exist_locs)
# Apply the combined mask to all features at once
feat_data = {key: feat[combined_mask] for key, feat in feat_data.items()}
# Verify lengths match
for key, feat in feat_data.items():
assert len(src_ids) == len(feat), \
f"Expecting the edge feature {key} has the same length " \
f"as num existing edges {len(src_ids)}, but get {len(feat)}"
The key changes are:
- Create a combined boolean mask instead of filtering sequentially
- If src_exist_locs exists, start with all False and set True for those locations
- If dst_exist_locs exists, use numpy's isin to find indices that are in dst_exist_locs
- Apply the combined mask once to all features
This approach avoids the issue of applying dst_exist_locs to already filtered data, and ensures that both filters are applied correctly regardless of order.