DSL algorithm library extension:cluster_coefficient
【需求描述】 DSL算法库扩展:
- cluster_coefficient
【期望产出】
- 支持实现上述DSL算法,同时提供并跑通测试用例。
如有疑问请联系: @puuuuug
Design Proposal
1. Core Algorithm Implementation
Based on GeaFlow's UDGA framework, we need to implement the AlgorithmUserFunction interface
Implementation Path: geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/ClusterCoefficient.java
Core Implementation Points:
- Inherit from
AlgorithmUserFunctioninterface, following existing algorithm implementation patterns - Algorithm consists of 3 iteration phases:
- First round: Collect neighbor information for each node
- Second round: Calculate connection count between neighbors
- Third round: Output final clustering coefficient results
Parameter Design:
- Support optional vertex type filtering parameter
- Support minimum degree threshold parameter (nodes with degree < 2 have clustering coefficient 0)
2. Algorithm Registration
Need to register the new algorithm function in BuildInSqlFunctionTable to enable SQL CALL syntax invocation
Registration Location:
- Add
cluster_coefficientfunction registration in existing graph algorithm registration code - Follow existing algorithm registration patterns like PageRank and TriangleCount
3. Test Case Design
SQL Test File: geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_algorithm_cluster_coefficient.sql
Test Content Includes:
-- Basic test: Calculate clustering coefficient for all nodes
CALL cluster_coefficient() YIELD (vid, coefficient)
-- Parameterized test: Calculate clustering coefficient for specified vertex type
CALL cluster_coefficient('person') YIELD (vid, coefficient)
-- Degree threshold test
CALL cluster_coefficient('person', 2) YIELD (vid, coefficient)
Java Test Class: Add testAlgorithmClusterCoefficient test method in GQLAlgorithmTest
4. Detailed Algorithm Implementation Design
Message Type Design:
- Use composite message type containing neighbor ID lists and connection count information
- First round passes neighbor IDs, second round passes neighbor connection confirmation information
Output Type Design:
public StructType getOutputType() {
return new StructType(
new TableField("vid", LongType.INSTANCE, false),
new TableField("coefficient", DoubleType.INSTANCE, false)
);
}
Processing Logic Design:
- 1st iteration: Each node sends its neighbor list to all neighbors
- 2nd iteration: Calculate actual connections between neighbors
- 3rd iteration: Calculate and output clustering coefficient
5. Performance Optimization Considerations
Memory Optimization:
- For high-degree nodes, use BitSet or Bloom filters to optimize neighbor lookup
- Batch process neighbor calculations for large-degree nodes
Computation Optimization:
- Directly output coefficient 0 for nodes with degree < 2, skip complex calculations
- Use incremental computation to avoid repeated neighbor connection checks
6. Deployment and Integration
Function Registration:
- Register algorithm as built-in function supporting standard SQL call syntax
- Follow existing algorithm registration patterns for integration
Documentation Updates:
- Add cluster_coefficient algorithm usage instructions in UDGA documentation
- Provide complete parameter descriptions and usage examples
@Loognqiang @kitalkuyo-gita 我这边实现了下这个算法,或许可以帮忙看看是否符合预期?
Hello, @watchMaker-sylar . Thank you for your reply. I read your code. Your solution is a Local Clustering Coefficient (UDGA) based on a vertex iterative model. The goal is to calculate the edges between each vertex v and its neighbors, ultimately outputting (vid, coefficient).
The implementation involves vertices exchanging neighbor information by sending messages to each other, comparing intersections in subsequent iterations, and finally summarizing and outputting the coefficients.
Algorithm Flowchart
flowchart TD
Start["Start"] --> Init["Initialize graph data<br/>Set source vertex ID=11342"]
Init --> Iter1{"Iteration 1"}
Iter1 --> CheckSrc1{"Is current vertex<br/>source vertex?"}
CheckSrc1 -->|Yes| SendMsg1["Send message 1<br/>to all neighbors"]
CheckSrc1 -->|No| Wait1["Wait"]
SendMsg1 --> Iter2{"Iteration 2"}
Wait1 --> Iter2
Iter2 --> RecvMsg2{"Received message?"}
RecvMsg2 -->|Yes| Forward["Forward message 1<br/>to all neighbors"]
RecvMsg2 -->|No| Wait2["Wait"]
Forward --> Iter3{"Iteration 3"}
Wait2 --> Iter3
Iter3 --> CheckSrc3{"Is current vertex<br/>source vertex?"}
CheckSrc3 -->|No| CountMsg["Count received<br/>messages"]
CheckSrc3 -->|Yes| Wait3["Wait"]
CountMsg --> SendCount["Send count back<br/>to source vertex"]
SendCount --> Iter4{"Iteration 4"}
Wait3 --> Iter4
Iter4 --> CheckSrc4{"Is current vertex<br/>source vertex?"}
CheckSrc4 -->|Yes| CalcCoef["Calculate clustering coefficient<br/>coefficient = edgeNum / (degree * (degree - 1))"]
CheckSrc4 -->|No| Wait4["Wait"]
CalcCoef --> UpdateValue["Update vertex value"]
UpdateValue --> End["End"]
Wait4 --> End
Algorithm Sequence Diagram
sequenceDiagram
participant SV as Source Vertex<br/>(ID=11342)
participant N1 as Neighbor Vertex 1
participant N2 as Neighbor Vertex 2
participant Nx as Other Neighbors
Note over SV,Nx: Iteration 1: Source vertex sends messages
SV->>N1: Send message(1)
SV->>N2: Send message(1)
SV->>Nx: Send message(1)
Note over SV,Nx: Iteration 2: Neighbors forward messages
N1->>N2: Forward message(1)
N1->>Nx: Forward message(1)
N2->>N1: Forward message(1)
N2->>Nx: Forward message(1)
Nx->>N1: Forward message(1)
Nx->>N2: Forward message(1)
Note over SV,Nx: Iteration 3: Count and send back
N1->>N1: Count received messages = count1
N2->>N2: Count received messages = count2
Nx->>Nx: Count received messages = countX
N1->>SV: Send count(count1)
N2->>SV: Send count(count2)
Nx->>SV: Send count(countX)
Note over SV,Nx: Iteration 4: Calculate clustering coefficient
SV->>SV: Sum all counts = edgeNum
SV->>SV: Get degree = degree
SV->>SV: Calculate coefficient = edgeNum / (degree * (degree - 1))
SV->>SV: Update vertex value
I will discuss this in four iterations.
- Iteration 1 — Broadcast vertex identifier and type to all neighbors
Purpose: To let each vertex's neighbors know the vertex's ID and label (for subsequent neighbor discovery and optional type filtering).
In the vertex-centric model, information can only be sent to neighbors via message passing. Identification information must be sent first to collect "who are my neighbors" information in the next round.
- Iteration 2 — Collect the neighbor list, perform type filtering, send its own neighbor list back to each neighbor, and write the neighbor list to the vertex value (persistence)
Each vertex collects messages from its neighbors in this round, thus constructing its own first-order neighbor list N(v).
For vertices with degree < threshold, directly emit coefficient = 0 (early termination, saving subsequent computation).
For vertices with degree >= threshold, send its own neighbor list to each neighbor (so that the neighbors know the set of neighbors of v), and save its neighbor set to the vertex state via updateVertexValue for subsequent iterations.
Note: updateVertexValue is not "immediately readable" in this framework—it will be visible in the updatedValues parameter of the next iteration. Therefore, another round is needed to use this persistent value.
- Iteration 3 — Receive the neighbor list sent by each neighbor, compare it with the neighbor list saved by the user (calculate the intersection/connectivity with the neighbor set), and write the statistics back to the vertex state.
This round retrieves the previously saved N(v) from updatedValues and iterates through the N(u) received from the neighbors.
Calculate the "connectivity metric between neighbors" (represented by some kind of count in the implementation), and then save (count, degree) via updateVertexValue for final calculation in the next round.
Similarly, since the final output also reads updatedValues in another iteration, it's sends another self-message/triggers the next round.
- Iteration 4 — Read (count, degree) and calculate the final clustering coefficient, emit the result.
Substitute count and degree into the formula coefficient = count / (degree * (degree - 1.0)), and output (vid, coefficient).
My understanding of the real reason why four rounds of iteration are needed:
-
The vertex-centric model has a phased approach to message/state propagation: First, the ID is sent out, and neighbor information is collected in the next round; then, the neighbor list is written to the vertex state and read for comparison in the next round; finally, the calculation result is written back and read for output in the next round (or directly taken in the next round). Therefore, at least:
-
One round is needed for neighbor discovery (iteration 1→2 receives the list);
-
One round is needed for distributing and persisting the neighbor list (iteration 2→3 reads the persisted list and receives the neighbor list);
-
One round is needed for writing the statistical value after comparison (iteration 3→4 reads the statistical value and outputs).
-
-
Summary: The
updateVertexValuesemantics used in the implementation (state is visible in the next iteration) and the message-triggered vertex activation result in four iterations needed to complete the complete pipeline of "discovery → exchanging neighbor lists → statistical intersection → output".
Implementation Details and Design Trade-offs
-
Early Threshold Optimization: Returning 0 directly for nodes with degrees less than the threshold avoids transmitting/calculating large amounts of neighbor lists for low-degree nodes, reducing overhead.
-
Vertex Type Filtering (vertexType): Allows coefficients to be calculated only for vertices with a specific label (categorized processing of the graph). The implementation checks the vertex label and also uses the label when processing neighbor messages to decide whether to include a neighbor in the set (used for "considering only a certain type of neighbor" scenarios).
-
Data Structures and Messages: Sending the neighbor list as an ObjectRow(size, List<Long>) is a direct approach to explicitly transmitting the complete neighbor set; it's easy to implement but generates a large message volume.
-
Complexity and Communication Cost: Sending the complete neighbor list to each neighbor for each vertex incurs a communication and memory cost of O(∑_v deg(v)^2) (higher-order nodes can lead to quadratic message volumes), which is extremely unfriendly to high-concurrency/large-degree vertices.
But this also brings some problems.
The way the intersection is counted is questionable:
In iteration 3, the code only performs a boolean check for "whether there is an intersection (non-empty)" for each received list of neighbors. If the intersection is non-empty, count is incremented (instead of adding the size of the intersection to count). This means that count records "how many neighbor sets of u and v have at least one common element", not "the number of edges between neighbor sets" or "the number of triangles with v as a vertex". Typically, the numerator of the local clustering coefficient should be "the number of actual edges between neighbor nodes" (for undirected graphs), i.e., the actual number of edge pairs or triangles (requiring sum |N(v) ∩ N(u)|, adjusted appropriately for undirected/directed, and repeated counting), not a boolean count. The current implementation overestimates/underestimates the results and does not conform to the standard definition.
Type Filtering Logic Issues:
In Iteration 2, the conditional statement during message collection: if (vertexType == null) neighborIds.add(...) else if (!vertexType.equals(vertexLabel)) neighborIds.add(...) seems semantically inconsistent with "only retaining neighbors of the specified type" (there is reverse logic). It should be reviewed to see if it's written backwards (perhaps intending to exclude/include different sets). Furthermore, the preceding line if (vertexType != null && !vertexType.equals(vertex.getLabel())) return; causes vertices of non-target types to be returned directly, not participating in the calculation—combined, the behavior is even more difficult to understand intuitively. It is recommended to confirm the requirements and revise the conditions.
Consistency between denominator and numerator:
The final formula coefficient = count / (degree * (degree - 1.0))` assumes that `count` has the same unit of measurement as `degree*(degree-1). The standard definition of local clustering coefficients (undirected and without self-loops) is 2 * E_N / (deg * (deg - 1)), where E_N is the actual number of edges between neighbors (undirected edges). If count represents E_N, it should be multiplied by 2; if count has already been multiplied in some way, the formula needs to match. The current implementation neither multiplies by 2 nor explicitly defines count as E_N, and its compatibility needs to be verified.