seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Feature] [Connector] Support Python-based Custom Data Sources As source for SeaTunnel

Open davidzollo opened this issue 1 month ago • 3 comments

Background

Currently, SeaTunnel supports so many connectors written in Java, but we lack a native support for Python-based data sources. Many data engineers and data scientists prefer Python for its rich ecosystem of data processing libraries (pandas, requests, boto3, etc.) and ease of use.

This feature request proposes to enable users to write custom data sources in Python while leveraging SeaTunnel's powerful distributed execution engine.

Goals

Phase 1 (MVP)

  • Enable Python scripts to act as SeaTunnel data sources
  • Support basic data exchange between Java and Python processes via Java ProcessBuilder
  • Provide simple configuration and Python script specification
  • Support JSON-based data serialization
  • Handle common data types (string, int, float, boolean, timestamp, etc.)

Future Phases

  • Multi-process parallel execution
  • Enhanced error handling and retry mechanisms
  • Checkpoint-based fault tolerance

Architecture Overview

┌─────────────────────────────────────┐
│   SeaTunnel Engine (Java)           │
│  ┌──────────────────────────────┐   │
│  │  PythonSource                │   │
│  │  - createReader()            │   │
│  │  - createEnumerator()        │   │
│  └──────────┬───────────────────┘   │
│             │                        │
│  ┌──────────▼───────────────────┐   │
│  │  PythonSourceReader          │   │
│  │  - pollNext()                │   │
│  │  - snapshotState()           │   │
│  └──────────┬───────────────────┘   │
│             │ IPC                    │
│             │ (ProcessBuilder)       │
└─────────────┼────────────────────────┘
              │
              │ stdout/stdin
              │ (JSON Lines)
              │
┌─────────────▼────────────────────────┐
│   Python Process                     │
│  ┌──────────────────────────────┐    │
│  │  User Python Script          │    │
│  │  - read_data()               │    │
│  │  - get_schema()              │    │
│  └──────────────────────────────┘    │
└──────────────────────────────────────┘

Example Configuration

source {
  PythonSource {
    python.executable = "python3"
    python.script.path = "/path/to/user_script.py"
    python.script.config = {
      api_url = "https://api.example.com"
      api_key = "xxx"
    }
    result_table_name = "python_table"
  }
}

Example Python Script

#!/usr/bin/env python3
import sys
import json

def get_schema():
    return {
        "type": "SCHEMA",
        "fields": [
            {"name": "id", "type": "BIGINT"},
            {"name": "name", "type": "STRING"}
        ]
    }

def read_data(config):
    # User-defined data reading logic
    for i in range(100):
        yield {
            "type": "DATA",
            "rows": [{"id": i, "name": f"user_{i}"}]
        }
    yield {"type": "END_OF_STREAM"}

def main():
    for line in sys.stdin:
        request = json.loads(line)
        if request["type"] == "GET_SCHEMA":
            print(json.dumps(get_schema()), flush=True)
        elif request["type"] == "READ_NEXT":
            for row in read_data(request.get("config", {})):
                print(json.dumps(row), flush=True)

if __name__ == "__main__":
    main()

Use Cases

  1. Custom API Integration: Fetch data from APIs that don't have existing connectors
  2. Python Libraries: Leverage Python's rich ecosystem (boto3, pymongo, etc.)

Implementation References

  • Buffered reading: GraphQLSourceSocketReader.java in connector-graphql
  • Source API: seatunnel-api/src/main/java/org/apache/seatunnel/api/source/

Module Structure

seatunnel-connectors-v2/
└── connector-python/
    ├── pom.xml
    └── src/
        ├── main/java/.../python/
        │   ├── source/
        │   │   ├── PythonSource.java
        │   │   ├── PythonSourceReader.java
        │   │   └── PythonSourceSplitEnumerator.java
        │   ├── config/PythonSourceConfig.java
        │   └── serialize/JsonToRowDeserializer.java
        └── test/
            └── resources/python/
                └── test_source.py

davidzollo avatar Dec 11 '25 13:12 davidzollo

If there is anybody who'd like to implement it, please feel free to leave a message, then you can complete it within four weeks.

davidzollo avatar Dec 11 '25 14:12 davidzollo

Could you clarify how this Python utility is expected to be used and what the success criteria are? Does it require familiarity with Java and SeaTunnel?

jgafnea avatar Dec 11 '25 18:12 jgafnea

@davidzollo it seems to be nice issue to work on, if its open for external contributors, shall i contribute and create a PR?

AryanBagade avatar Dec 11 '25 19:12 AryanBagade

If there is anybody who'd like to implement it, please feel free to leave a message, then you can complete it within four weeks.

Could you clarify how this Python utility is expected to be used and what the success criteria are? Does it require familiarity with Java and SeaTunnel?

Yes, it require familiarity with Java and SeaTunnel, and I have updated the issue with success criteria, but it's only a reference for contributors, anyone can submit a better suggestion.

davidzollo avatar Dec 15 '25 16:12 davidzollo

@davidzollo it seems to be nice issue to work on, if its open for external contributors, shall i contribute and create a PR?

yes, I'm so glad to see you can contribute and submit a PR

davidzollo avatar Dec 15 '25 16:12 davidzollo