tugraph-analytics icon indicating copy to clipboard operation
tugraph-analytics copied to clipboard

GeaFlow Implementation Plan for Gremlin Syntax Graph Traversal

Open kitalkuyo-gita opened this issue 2 months ago • 3 comments

I. Current Situation Analysis

1.1 Current Architecture

GeaFlow currently uses SQL+GQL fusion syntax. The overall architecture includes

  • Parsing layer: Parser based on Apache Calcite extension

  • Planning layer: Logical plan and physical plan conversion

  • Runtime layer: Execution engine based on two-level DAG

  • Underlying API: VertexCentricTraversal graph traversal interface

1.2 Existing graph traversal capabilities

GeaFlow provides the underlying graph traversal API

Although the documentation mentions support for Gremlin, no actual implementation of Gremlin has been found in the code base.

2. Technical Solution Design

Solution Selection: Adapter Pattern + Unified IR

We recommend using a Gremlin-to-GeaFlow Adapter solution instead of a complete rewrite:

graph TB
A[Gremlin Query] --> B[Gremlin Parser]
B --> C[Gremlin Bytecode/AST]
C --> D[Gremlin-to-IR Converter]
D --> E[GeaFlow Unified IR]
E --> F[Physical Plan]
F --> G[VertexCentricTraversal API]
G --> H[Execution Engine]

I[GQL Query] --> J[GQL Parser]
J --> K[GQL AST]
K --> E

3. Detailed Implementation

3.1 Module Structure Design

The following modules are recommended:

geaflow/
├── geaflow-dsl/
│ ├── geaflow-dsl-gremlin/ # New
│ │ ├── geaflow-gremlin-parser/ # Gremlin Parser
│ │ ├── geaflow-gremlin-plan/ # Conversion Layer
│ │ └── geaflow-gremlin-runtime/ # Gremlin Runtime Adapter
│ ├── geaflow-dsl-parser/ # Existing
│ ├── geaflow-dsl-plan/ # Existing
│ └── geaflow-dsl-runtime/ # Existing

3.2 Core Component Implementation

Component 1: Gremlin Parser Layer

Dependency import:

<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-core</artifactId>
<version>3.7.0</version>
</dependency>

Implementation class:

  • GeaFlowGremlinParser: Parses Gremlin query strings
  • GremlinBytecodeTranslator: Converts Gremlin Bytecode to internal representation

Key interfaces:

public interface IGremlinParser {
GremlinQuery parse(String gremlinScript);
GremlinQuery parse(Bytecode bytecode);
}

Component 2: Gremlin-to-IR conversion layer

Reference to existing DSL architecture

Core converter:

  • GremlinToRelConverter: Convert Gremlin Steps to RelNode
  • GremlinStepTranslator: Handle various Gremlin Steps

Key mapping relationships:

Gremlin Step GeaFlow corresponding implementation
g.V() VertexQuery full query
g.V(id) VertexQuery.withId(id)
.out() / .outE() EdgeQuery.getOutEdges()
.in() / .inE() EdgeQuery.getInEdges()
.has() / .filter() IFilter interface
.values() Property projection
.path() TraversalResponse path collection

Component 3: Gremlin Runtime Adaptation layer

Core classes:

  • GremlinTraversalExecutor: executes Gremlin traversal logic
  • GremlinVertexProgram: implements VertexCentricTraversal interface
  • GremlinMessageCombiner: message aggregation

Adapt to existing Traversal API:

3.3 Execution process design

sequenceDiagram
participant User
participant GremlinParser
participant Converter
participant Planner
participant Runtime
participant TraversalAPI

User->>GremlinParser: g.V().out().has("age", gt(30))
GremlinParser->>Converter: Gremlin Bytecode
Converter->>Planner: Logical Plan (RelNode)
Planner->>Planner: optimization (RBO/CBO)
Planner->>Runtime: Physical Plan
Runtime->>TraversalAPI: VertexCentricTraversal
TraversalAPI->>Runtime: Execution Result
Runtime->>User: Return Result

4. Key Technology Implementation

4.1 Gremlin Step Mapping Implementation

Example: out() Step Mapping

// Gremlin: g.V(1).out("knows")
// Mapping to GeaFlow VertexCentricTraversal

public class GremlinOutStepTranslator {  
    public void translate(OutStep step, TraversalContext ctx) {  
        String edgeLabel = step.getEdgeLabel();  
          
        // 映射到 EdgeQuery  
        ctx.addCompute((vertex, context) -> {  
            List<IEdge> edges = context.edges()  
                .getOutEdges()  // 对应 out()  
                .filter(e -> edgeLabel == null ||   
                        e.getLabel().equals(edgeLabel))  
                .collect();  
              
            // 发送消息到邻居节点  
            for (IEdge edge : edges) {  
                context.sendMessage(edge.getTargetId(),   
                    new TraversalMessage(context.getPath()));  
            }  
        });  
    }  
}

4.2 Key Points in Gremlin Semantics Support

1) Path Tracing

Gremlin's path() requires complete path information:

public class PathTrackingMessage implements Serializable {  
    private List<Object> vertexIds;  
    private List<Object> edgeIds;  
    private Map<String, Object> labels;  
}

2) Subgraph Traversal

Supports repeat(), until(), and times() etc. loop traversal:

public class RepeatStepTranslator {  
    // 映射到 GeaFlow 的迭代计算  
    public void translate(RepeatStep step, TraversalContext ctx) {  
        int maxIterations = step.getMaxIterations();  
        ctx.withIterations(maxIterations);  
        // ... 实现重复遍历逻辑  
    }  
}

3) Aggregation Operation

Support count(), sum(), groupCount(), etc.

4.3 Performance Optimization Strategy

1) Predicate Pushdown Push has(), filter() down to the storage layer

2) Batch Message Passing Optimize Gremlin's multi-step traversal to batch operations

3) Query Plan Optimization

  • Early termination of paths that do not meet the conditions
  • Merge consecutive edge traversal operations

V. Development and Implementation Roadmap

Phase 1: Basic Framework Construction (2-3 weeks)

    1. Create the geaflow-dsl-gremlin module structure
    1. Introduce the TinkerPop dependency
    1. Implement the basic parser and AST structure
    1. Build a unit testing framework

Phase 2: Core Step Support (4-6 weeks)

Prioritize support for frequently used Gremlin Steps:

    1. Vertex Operations: V(), E()
    1. Edge Traversals: out(), in(), both(), outE(), inE(), bothE()
    1. Filtering: has(), hasLabel(), filter(), where()
    1. Projection: values(), valueMap(), select()
    1. Transformations: map(), flatMap()

Phase 3: Advanced Features (4-6 weeks)

    1. Path Operations: path(), simplePath(), cyclicPath()
    1. Looping: repeat(), until(), times(), emit()
    1. Aggregations: count(), sum(), mean(), groupCount()
    1. Sorting Constraints: order(), limit(), range()

Phase 4: Optimization and Integration (3-4 weeks)

    1. Query Optimizer Integration
    1. Integration with the Existing DSL Unified IR Layer
    1. Performance Testing and Tuning
    1. Documentation

Phase 5: Production Readiness (2-3 weeks)

    1. Complete Integration Testing
    1. Compatibility Testing (TinkerPop TCK)
    1. Monitoring Tracking
    1. Production Environment Trial Run

VI. Compatibility Guarantee

6.1 TinkerPop Standard Compatibility

It is recommended to verify through TinkerPop TCK (Technology Compatibility Kit):

@RunWith(JUnit4.class)  
public class GeaFlowGremlinProcessTest extends ProcessStandardSuite {  
    @Override  
    public GraphProvider provider() {  
        return new GeaFlowGraphProvider();  
    }  
}

6.2 Coexistence with Existing GQL

The two DSLs share the underlying execution engine

VII. Production Deployment Recommendations

7.1 Configuration Management

# Gremlin Configuration
geaflow.gremlin.max.traversers=1000000
geaflow.gremlin.timeout.ms=30000
geaflow.gremlin.optimize.enabled=true
geaflow.gremlin.path.tracking=true

7.2 Monitoring Metrics

  • Gremlin Query Parsing Time
  • Step Execution Time Distribution
  • Number of Traversers
  • Memory Usage

7.3 Error Handling

  • Syntax Errors: Provides detailed error location and suggestions
  • Runtime Errors: Timeout protection, resource limits
  • Downgrade Strategy: Automatically reject complex queries

8. Risks and Challenges

8.1 Technical Risks

  1. Semantic Difference: Some Gremlin semantics may not fully match the GeaFlow computation model.
  • Mitigation: Clearly document unsupported features.
  1. Performance Overhead: Step-by-Step The execution mode may introduce additional overhead.
  • Mitigation: Batch optimization, query rewrite
  1. State Management: Path tracing may consume a large amount of memory.
  • Mitigation: Limit path length, use compressed storage.

8.2 Compatibility Risks

  • TinkerPop version upgrades may introduce API changes.
  • It is recommended to lock in the stable TinkerPop 3.6/3.7 versions.

kitalkuyo-gita avatar Oct 16 '25 08:10 kitalkuyo-gita

Hi community, I've noticed some users are looking for support for Gremlin DSL. Looking back at the history of graph computing, many early users still adopted Gremlin as the de facto standard. Based on the existing capabilities of Geaflow, I've made some changes. Everyone in the community is welcome to join the discussion. PR: https://github.com/apache/geaflow/pull/636

Gremlin Query Execution Flow Explained

GremlinQuery is only a parsed query representation, similar to SQL's AST (Abstract Syntax Tree), and contains no query results.

The True Meaning of GremlinQuery

public class GremlinQuery {
    private final String queryString;      // Original query string
    private final Bytecode bytecode;       // Gremlin bytecode (serialized format)
    private final Traversal traversal;     // Unexecuted traversal plan
}

Analogy:

  • GremlinQuery = SQL's PreparedStatement (prepared statement, but not executed)
  • Query results = ResultSet (results after execution)

Complete Execution Flow

Flow Diagram

User Input
    ↓
"g.V().has('age', gt(30))"  ← Query string
    ↓
[GeaFlowGremlinParser.parse()]
    ↓
GremlinQuery {              ← Parsed query (not executed)
    queryString: "g.V()..."
    bytecode: [V(), has(...)]
    traversal: GraphTraversal (not executed)
}
    ↓
[Solution Selection]
    ↓
┌─────────────────────┬─────────────────────────┐
│  Solution 1: TinkerPop  │  Solution 2: GeaFlow IR     │
│  (Simple scenarios)      │  (Production scenarios)      │
└─────────────────────┴─────────────────────────┘

Solution 1: TinkerPop Standard Execution (Simple Scenarios)

Applicable Scenarios:

  • Single-machine testing
  • Small-scale data
  • Rapid prototyping validation

Execution Steps:

// Step 1: Parse query
GeaFlowGremlinParser parser = new GeaFlowGremlinParser(graph);
GremlinQuery query = parser.parse("g.V().has('age', gt(30))");

// At this point: query is just a query representation with no results!

// Step 2: Get Traversal object
GraphTraversal<Vertex, Vertex> traversal = 
    (GraphTraversal<Vertex, Vertex>) query.getTraversal();

// At this point: traversal is an unexecuted traversal plan

// Step 3: Execute query and get results
List<Vertex> results = traversal.toList();  // ← This is where execution actually happens!

// Now: results are the actual query results
for (Vertex v : results) {
    System.out.println(v.value("name"));
}

Key Points:

  • query.getTraversal() returns an unexecuted plan
  • Only terminal operations like .toList(), .next(), .hasNext() actually execute
  • Execution happens in TinkerGraph local memory

Solution 2: GeaFlow IR Execution (Production Scenarios)

Applicable Scenarios:

  • Production environment
  • Large-scale graph data
  • Distributed computing
  • Query optimization required

Execution Steps:

// Step 1: Parse Gremlin query
GeaFlowGremlinParser parser = new GeaFlowGremlinParser(graph);
GremlinQuery query = parser.parse("g.V().has('age', gt(30)).out('knows')");

// At this point: query is just a query representation (contains Bytecode)

// Step 2: Convert to GeaFlow unified IR (RelNode)
GeaFlowGremlinToRelConverter converter = new GeaFlowGremlinToRelConverter();
RelNode logicalPlan = converter.convert(query, gqlToRelConverter);

// At this point: logicalPlan is a logical execution plan (not yet optimized)
// Example: LogicalGraphScan → LogicalFilter → LogicalProject

// Step 3: Query optimization
HepProgram program = HepProgram.builder()
    .addRuleInstance(new GremlinPredicatePushdownRule())
    .addRuleInstance(new GremlinBatchMessageRule())
    .addRuleInstance(new GremlinPathOptimizationRule())
    .build();
    
HepPlanner planner = new HepPlanner(program);
planner.setRoot(logicalPlan);
RelNode optimizedPlan = planner.findBestExp();

// At this point: optimizedPlan is an optimized logical plan
// Example: LogicalGraphScan(filter: age > 30) → LogicalProject (predicate pushdown)

// Step 4: Convert to physical plan
PhysicalPlanConverter physicalConverter = new PhysicalPlanConverter();
RelNode physicalPlan = physicalConverter.convert(optimizedPlan);

// At this point: physicalPlan is a physical execution plan

// Step 5: Convert to VertexCentricTraversal
GeaFlowGremlinTraversalExecutor executor = new GeaFlowGremlinTraversalExecutor();
VertexCentricTraversal<K, VV, EV, M, R> traversal = 
    executor.execute(physicalPlan, traversalContext);

// At this point: traversal is a GeaFlow graph traversal program (not yet executed)

// Step 6: Submit to GeaFlow execution engine
PipelineResult result = graph
    .incrementalTraversal(traversal)
    .execute();

// Step 7: Get results
result.get();  // ← This is where distributed computing actually happens!

// Or get results through Sink
traversal.sink(new CollectSink<>(resultList));

Key Points:

  • GremlinQueryRelNodePhysicalPlanVertexCentricTraversal are all plans
  • Only after submitting to GeaFlow execution engine does actual execution happen
  • Execution happens in distributed clusters
  • Supports incremental computation and dynamic graphs

Comparison of Two Solutions

Feature Solution 1: TinkerPop Solution 2: GeaFlow IR
Execution Method Local memory execution Distributed execution
Data Scale Small scale (MB-GB) Large scale (TB-PB)
Query Optimization None Complete optimizer (RBO/CBO)
Incremental Computation Not supported Supported
Dynamic Graph Not supported Supported
Use Cases Testing, prototyping Production environment
Performance Single-machine performance Distributed high performance

Actual Code Examples

Example 1: TinkerPop Approach (Can be run directly)

@Test
public void testTinkerPopExecution() {
    // 1. Create graph
    Graph graph = TinkerGraph.open();
    graph.addVertex("name", "Alice", "age", 30);
    graph.addVertex("name", "Bob", "age", 35);
    
    // 2. Create GraphTraversalSource
    GraphTraversalSource g = graph.traversal();
    
    // 3. Execute query (directly using TinkerPop API)
    List<Object> names = g.V()
        .has("age", P.gt(30))
        .values("name")
        .toList();  // ← Execute
    
    // 4. Print results
    System.out.println("Results: " + names);  // Output: [Bob]
}

Example 2: GeaFlow Approach (Requires complete environment)

@Test
public void testGeaFlowExecution() {
    // 1. Parse Gremlin query
    GeaFlowGremlinParser parser = new GeaFlowGremlinParser(graph);
    GremlinQuery query = parser.parse("g.V().has('age', gt(30)).values('name')");
    
    // 2. Convert to RelNode
    GeaFlowGremlinToRelConverter converter = new GeaFlowGremlinToRelConverter();
    RelNode relNode = converter.convert(query, gqlToRelConverter);
    
    // 3. Optimize (This is GeaFlow's advantage)
    RelNode optimized = optimizer.optimize(relNode);
    
    // 4. Convert to VertexCentricTraversal
    VertexCentricTraversal traversal = executor.execute(optimized, context);
    
    // 5. Submit for execution
    PipelineResult result = graph.incrementalTraversal(traversal).execute();
    
    // 6. Get results
    List<String> names = result.get();
    System.out.println("Results: " + names);
}

Why Do We Need GeaFlow IR Layer?

1. Query Optimization

// Original query
g.V().has('age', gt(30)).out('knows').has('city', 'Beijing')

// Unoptimized execution plan
V() → Filter(age > 30) → out() → Filter(city = 'Beijing')

// Optimized execution plan (predicate pushdown)
V(filter: age > 30) → out() → V(filter: city = 'Beijing')
// Reduces data volume transmitted over network

2. Distributed Execution

// TinkerPop: Single-machine execution
List<Vertex> results = g.V().out().out().toList();  // May cause OOM

// GeaFlow: Distributed execution
VertexCentricTraversal traversal = converter.convert(query);
graph.incrementalTraversal(traversal).execute();  // Automatically distributed

3. Incremental Computation

// Scenario: Graph data continuously updates, incremental computation needed
// TinkerPop: Full computation every time
// GeaFlow: Only compute changed parts

graph.incrementalTraversal(traversal)
    .withWindowSize(Duration.ofMinutes(5))  // 5-minute window
    .execute();  // Incrementally update results

Current Implementation Status

✅ Implemented

  1. GeaFlowGremlinParser: Parse Gremlin queries into GremlinQuery
  2. GeaFlowGremlinToRelConverter: Convert GremlinQuery to RelNode
  3. Optimization Rules: 5 optimization rules (predicate pushdown, batch messages, etc.)
  4. GeaFlowGremlinTraversalExecutor: Executor interface
  5. GremlinVertexProgram: VertexCentricTraversal implementation

🚧 Needs Completion (End-to-end execution)

  1. Complete Physical Plan conversion
  2. Integration with GeaFlow execution engine
  3. Result collection and return mechanism

How to Use

Current Stage (Testing and Validation)

// 1. Use TinkerPop approach to validate query correctness
@Test
public void testQueryCorrectness() {
    Graph graph = TinkerGraph.open();
    // ... add data
    
    GraphTraversalSource g = graph.traversal();
    List<Vertex> results = g.V().has("age", P.gt(30)).toList();
    
    // Validate results
    assertEquals(2, results.size());
}

// 2. Use GeaFlow IR to validate conversion correctness
@Test
public void testConversion() {
    GeaFlowGremlinParser parser = new GeaFlowGremlinParser(graph);
    GremlinQuery query = parser.parse("g.V().has('age', gt(30))");
    
    // Validate parsing
    assertTrue(query.isValid());
    assertEquals(2, query.getStepCount());
    
    // Validate conversion (requires mock GQLToRelConverter)
    // RelNode relNode = converter.convert(query, mockConverter);
    // assertNotNull(relNode);
}

Future Stage (Production Environment)

// Complete end-to-end execution
@Test
public void testEndToEndExecution() {
    // 1. Create GeaFlow environment
    GeaFlowEnvironment env = GeaFlowEnvironment.create();
    
    // 2. Load graph data
    GraphView graph = env.createGraphView("social_network");
    
    // 3. Execute Gremlin query
    String gremlinQuery = "g.V().has('age', gt(30)).out('knows').values('name')";
    List<String> results = env.executeGremlin(gremlinQuery);
    
    // 4. Validate results
    assertFalse(results.isEmpty());
}

Does anyone have any better ideas? @Leomrlin @Loognqiang

kitalkuyo-gita avatar Oct 20 '25 06:10 kitalkuyo-gita

Hi @kitalkuyo-gita,

Thank you for sharing such a detailed and high-quality technical proposal for adding Gremlin support in GeaFlow! The structure of the document is clear and well-organized, and it provides great insight into the implementation path forward.

We really appreciate such thoughtful contributions from the community and encourage further discussion and collaboration.

🔍 Initial Feedback

  1. Good architectural choice: Using the Adapter pattern with a unified IR is a smart approach that maintains compatibility while enabling extensibility.
  2. Clear module design: The proposed geaflow-dsl-gremlin module structure makes sense and sets a good foundation for future enhancements.
  3. Dual execution paths: Supporting both TinkerPop-based local execution and GeaFlow IR-based distributed execution is a solid strategy to cover different use cases.
  4. Compatibility assurance: Leveraging TinkerPop TCK for validation is an important step toward ensuring standard compliance.

⚠️ Key Technical Challenges for Discussion

The additional points you raised are very important, and we'd like to discuss them further:

  1. Compatibility Challenge between GQL IR and Gremlin:

    • GeaFlow currently uses a GQL-based IR system, where GQL is declarative while Gremlin is imperative traversal-oriented. The semantic models differ significantly.
    • Designing a unified IR that can accommodate both paradigms is indeed challenging and may require substantial modifications to the existing IR structure.
  2. Missing IR Extension Interfaces:

    • The current codebase doesn't have interfaces for such hybrid IR, which means we might need to refactor the IR layer first to add extensibility points.
  3. Operator Level Positioning:

    • GeaFlow's current logical plan has a two-layer structure: outer layer for table operators and inner layer for graph operators.
    • Where should our new Gremlin IR fit:
      • As an OP-layer operator (similar to SQL operators) - better integration with existing SQL/GQL flow
      • Or as part of the inner graph operator layer - more aligned with Gremlin's traversal semantics
    • This positioning requires careful consideration.

💬 Suggestions & Discussion Points

  • Could we design a Gremlin-to-GQL IR mapping layer first, translating Gremlin queries to equivalent GQL expressions before leveraging the existing IR execution?
  • Or should we extend the current IR structure to support imperative traversal patterns?

🚀 Next Steps

  • We suggest organizing a technical discussion session specifically focused on IR compatibility design issues.
  • The community welcomes everyone to join the discussion and contribute to this feature.

Thanks again for your deep thinking and great contribution!

Leomrlin avatar Oct 21 '25 12:10 Leomrlin

Hello @Leomrlin , thank you for your reply. I've implemented the conversion of Gremlin's graph traversal operator into LogicalPlan. As you mentioned, designing a unified IR that accommodates both paradigms is indeed challenging. My approach did not involve significantly modifying the existing IR code.

In general, I chose the first implementation path. As an OP layer operator (similar to SQL operator) - better integration with existing SQL/GQL processes

I directly converted Gremlin into the IR. You can see this class: #636

geaflow/geaflow-dsl/geaflow-dsl-gremlin/geaflow-gremlin-plan/src/main/java/org/apache/geaflow/dsl/gremlin/plan/converter/GeaFlowGremlinToRelConverter.java

You are also welcome to provide valuable suggestions for this POC implementation.

kitalkuyo-gita avatar Oct 23 '25 06:10 kitalkuyo-gita