seatunnel
seatunnel copied to clipboard
[Umbrella][Feature][Core] Decoupling connectors from compute engines
Search before asking
- [X] I had searched in the feature and found no similar feature requirement.
Description
In the current implementation of SeaTunnel, the connector is coupled with the computing engine. so implementing a connector requires implementing the interface of each computing engine.
The detailed design doc:https://docs.google.com/document/d/1h-BeGKK98VarSpYUfYkNtpYLVbYCh8PM_09N1pdq_aw/edit?usp=sharing
Motivation
- A connector only needs to be implemented once and can be used on all engines;
- Supports multiple versions of Spark/Flink engines;
- Source interface to explicit partition/shard/split/parallel logic.
- Multiplexing JDBC/log connection.
Why not use Apache Beam? The source of Apache Beam is divided into two categories: Unbounded and Bounded, which cannot achieve the purpose of one-time code;
Overall Design
-
Catalog:Metadata management, which can automatically discover the schema and other information of the structured database;
-
Catalog Storage:Used to store metadata for unstructured storage engines (e.g. Kafka);
-
SQL:
-
DataType:Table Column Data Type;
-
Table API:Used for context passing and SeaTunnel Source/Sink instantiation
-
Source API:
- Explicit partition/shard/split/parallel logic;
- Batch & Streaming Unification;
- Multiplexing source connection;
-
Sink API:
- Distributed transaction;
- Aggregated commits;
-
Translation:
- Make the engine support the SeaTunnel connector.
- Convert data to Row inside the engine.
- Data distribution after multiplexing.
Simple Flow
Why do we need multiplex connections Streaming scene:
- RDB (e.g. MySQL) may have too many connections errors or database pressure;
- Duplicate parsing logs under change data capture (CDC) scenes (e.g. MySQL binlog,Oracle Redolog);
Simple Source & Sink Flow
The subtasks:
- [x] #1701
- [x] #1704
- [x] #1734
- [x] #2490
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
what does your multiple connection means? does it means one job can have multiple sources in different type, or one job can have multiple connection to a same source instance to work on same or different split of a table?
what does your multiple connection means? does it means one job can have multiple sources in different type, or one job can have multiple connection to a same source instance to work on same or different split of a table?
@yx91490 Multiplexing source connection: a source instance can read data from multiple tables. At present, the source connector of Spark and Flink will create a connection for each table; That is, a source instance will only read one table; This is fine for off-line jobs, but not acceptable for real-time sync jobs with hundreds (or more) tables.
Is one Coordinator better than many? Because if an exception occurs at Sink and the Source does not know about it and continues to send data downstream, is the data discarded? If the Sink end notifies the Coordinator on the Source end, why not use the same one?
We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?
Anyway, looking forward to the new API . 🎉
Is one Coordinator better than many? Because if an exception occurs at Sink and the Source does not know about it and continues to send data downstream, is the data discarded? If the Sink end notifies the Coordinator on the Source end, why not use the same one?
@lonelyGhostisdog The fault-tolerance between source and sink is supported by chandy-lamport algorithm (ie checkpoint).
We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?
@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.
We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?
@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.
Why don't we design a new engine that doesn't dependency on Spark or Flink
We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?
@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.
Why don't we design a new engine that doesn't dependency on Spark or Flink
Should we first complete the design and construction of Source, Sink and Transform? Even if is only stand-alone mode like datax. Refine fault tolerance, tables, Sql through gradual iteration?
We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?
@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.
Why don't we design a new engine that doesn't dependency on Spark or Flink
The new engine is being designed. However, because a large number of SeaTunnel users are using Flink and spark, we hope to be compatible with Flink and spark engines as much as possible. In the future, if our own engine is good enough, we will discuss again whether to continue to rely on Flink and spark.
I have another question. If a Connector is not implemented in Spark or Flink, how will we implement this new Connector if we are based on a new engine? Do we still need to implement the corresponding Connector in Spark or Flink first?
I have another question. If a Connector is not implemented in Spark or Flink, how will we implement this new Connector if we are based on a new engine? Do we still need to implement the corresponding Connector in Spark or Flink first?
I suggest the new engine can adaptation the unified API too. And then If the user develops a connector based on the unified tool, the connector will be able to run on spark, Flink and ST own engine.
Thank everyone for answering my questions, maybe I need to see the code to understand the design better.
From the code, I only saw the abstract upper design about catalog,table,source,sink... and some logic of converting Seatunnel to Flink types. There is no distributed execution or network transmission or seatunnel Connector conversion to flink/ Spark connector ,etc. Is it not commited? Where can I find the corresponding designs?
Or are we going to do a datax-like model for now?
From the code, I only saw the abstract upper design about catalog,table,source,sink... and some logic of converting Seatunnel to Flink types. There is no distributed execution or network transmission or seatunnel Connector conversion to flink/ Spark connector ,etc. Is it not commited? Where can I find the corresponding designs?
Branch api-draft
only contain connector api, not contain st-engine.
Or are we going to do a datax-like model for now?
A seatunnel-engine is being designed. It is designed to solve scenarios that Flink and spark cannot support in data synchronization scenarios. Link resource share, jdbc connector pool, ddl support .
In the scenario of offline synchronization, it is more like dataX, and it supports distributed deployment and execution. It also needs to support real-time synchronization.
We plan to discuss these in the mailing list after the preliminary design is completed.
Completed!
Enable the SPI factory classes to improve the entire process.
LGTM