[Feature] [Connector] Support Python-based Custom Data Sources As source for SeaTunnel
Background
Currently, SeaTunnel supports so many connectors written in Java, but we lack a native support for Python-based data sources. Many data engineers and data scientists prefer Python for its rich ecosystem of data processing libraries (pandas, requests, boto3, etc.) and ease of use.
This feature request proposes to enable users to write custom data sources in Python while leveraging SeaTunnel's powerful distributed execution engine.
Goals
Phase 1 (MVP)
- Enable Python scripts to act as SeaTunnel data sources
- Support basic data exchange between Java and Python processes via Java ProcessBuilder
- Provide simple configuration and Python script specification
- Support JSON-based data serialization
- Handle common data types (string, int, float, boolean, timestamp, etc.)
Future Phases
- Multi-process parallel execution
- Enhanced error handling and retry mechanisms
- Checkpoint-based fault tolerance
Architecture Overview
┌─────────────────────────────────────┐
│ SeaTunnel Engine (Java) │
│ ┌──────────────────────────────┐ │
│ │ PythonSource │ │
│ │ - createReader() │ │
│ │ - createEnumerator() │ │
│ └──────────┬───────────────────┘ │
│ │ │
│ ┌──────────▼───────────────────┐ │
│ │ PythonSourceReader │ │
│ │ - pollNext() │ │
│ │ - snapshotState() │ │
│ └──────────┬───────────────────┘ │
│ │ IPC │
│ │ (ProcessBuilder) │
└─────────────┼────────────────────────┘
│
│ stdout/stdin
│ (JSON Lines)
│
┌─────────────▼────────────────────────┐
│ Python Process │
│ ┌──────────────────────────────┐ │
│ │ User Python Script │ │
│ │ - read_data() │ │
│ │ - get_schema() │ │
│ └──────────────────────────────┘ │
└──────────────────────────────────────┘
Example Configuration
source {
PythonSource {
python.executable = "python3"
python.script.path = "/path/to/user_script.py"
python.script.config = {
api_url = "https://api.example.com"
api_key = "xxx"
}
result_table_name = "python_table"
}
}
Example Python Script
#!/usr/bin/env python3
import sys
import json
def get_schema():
return {
"type": "SCHEMA",
"fields": [
{"name": "id", "type": "BIGINT"},
{"name": "name", "type": "STRING"}
]
}
def read_data(config):
# User-defined data reading logic
for i in range(100):
yield {
"type": "DATA",
"rows": [{"id": i, "name": f"user_{i}"}]
}
yield {"type": "END_OF_STREAM"}
def main():
for line in sys.stdin:
request = json.loads(line)
if request["type"] == "GET_SCHEMA":
print(json.dumps(get_schema()), flush=True)
elif request["type"] == "READ_NEXT":
for row in read_data(request.get("config", {})):
print(json.dumps(row), flush=True)
if __name__ == "__main__":
main()
Use Cases
- Custom API Integration: Fetch data from APIs that don't have existing connectors
- Python Libraries: Leverage Python's rich ecosystem (boto3, pymongo, etc.)
Implementation References
- Buffered reading:
GraphQLSourceSocketReader.javain connector-graphql - Source API:
seatunnel-api/src/main/java/org/apache/seatunnel/api/source/
Module Structure
seatunnel-connectors-v2/
└── connector-python/
├── pom.xml
└── src/
├── main/java/.../python/
│ ├── source/
│ │ ├── PythonSource.java
│ │ ├── PythonSourceReader.java
│ │ └── PythonSourceSplitEnumerator.java
│ ├── config/PythonSourceConfig.java
│ └── serialize/JsonToRowDeserializer.java
└── test/
└── resources/python/
└── test_source.py
If there is anybody who'd like to implement it, please feel free to leave a message, then you can complete it within four weeks.
Could you clarify how this Python utility is expected to be used and what the success criteria are? Does it require familiarity with Java and SeaTunnel?
@davidzollo it seems to be nice issue to work on, if its open for external contributors, shall i contribute and create a PR?
If there is anybody who'd like to implement it, please feel free to leave a message, then you can complete it within four weeks.
Could you clarify how this Python utility is expected to be used and what the success criteria are? Does it require familiarity with Java and SeaTunnel?
Yes, it require familiarity with Java and SeaTunnel, and I have updated the issue with success criteria, but it's only a reference for contributors, anyone can submit a better suggestion.
@davidzollo it seems to be nice issue to work on, if its open for external contributors, shall i contribute and create a PR?
yes, I'm so glad to see you can contribute and submit a PR