seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Umbrella][Feature][Core] Decoupling connectors from compute engines

Open ashulin opened this issue 2 years ago • 20 comments

Search before asking

  • [X] I had searched in the feature and found no similar feature requirement.

Description

In the current implementation of SeaTunnel, the connector is coupled with the computing engine. so implementing a connector requires implementing the interface of each computing engine.

The detailed design doc:https://docs.google.com/document/d/1h-BeGKK98VarSpYUfYkNtpYLVbYCh8PM_09N1pdq_aw/edit?usp=sharing

Motivation

  1. A connector only needs to be implemented once and can be used on all engines;
  2. Supports multiple versions of Spark/Flink engines;
  3. Source interface to explicit partition/shard/split/parallel logic.
  4. Multiplexing JDBC/log connection.

Why not use Apache Beam? The source of Apache Beam is divided into two categories: Unbounded and Bounded, which cannot achieve the purpose of one-time code;

Overall Design

SeaTunnel Framework

  • Catalog:Metadata management, which can automatically discover the schema and other information of the structured database;

  • Catalog Storage:Used to store metadata for unstructured storage engines (e.g. Kafka);

  • SQL

  • DataType:Table Column Data Type;

  • Table API:Used for context passing and SeaTunnel Source/Sink instantiation

  • Source API

    • Explicit partition/shard/split/parallel logic;
    • Batch & Streaming Unification;
    • Multiplexing source connection;
  • Sink API

    • Distributed transaction;
    • Aggregated commits;
  • Translation

    • Make the engine support the SeaTunnel connector.
    • Convert data to Row inside the engine.
    • Data distribution after multiplexing.

Simple Flow

SeaTunnel Flow

Why do we need multiplex connections Streaming scene:

  • RDB (e.g. MySQL) may have too many connections errors or database pressure;
  • Duplicate parsing logs under change data capture (CDC) scenes (e.g. MySQL binlog,Oracle Redolog);

Simple Source & Sink Flow

SeaTunnel Engine Flow

The subtasks:

  • [x] #1701
  • [x] #1704
  • [x] #1734
  • [x] #2490

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

ashulin avatar Mar 29 '22 07:03 ashulin

what does your multiple connection means? does it means one job can have multiple sources in different type, or one job can have multiple connection to a same source instance to work on same or different split of a table?

yx91490 avatar Apr 17 '22 00:04 yx91490

what does your multiple connection means? does it means one job can have multiple sources in different type, or one job can have multiple connection to a same source instance to work on same or different split of a table?

@yx91490 Multiplexing source connection: a source instance can read data from multiple tables. At present, the source connector of Spark and Flink will create a connection for each table; That is, a source instance will only read one table; This is fine for off-line jobs, but not acceptable for real-time sync jobs with hundreds (or more) tables.

ashulin avatar Apr 18 '22 03:04 ashulin

Is one Coordinator better than many? Because if an exception occurs at Sink and the Source does not know about it and continues to send data downstream, is the data discarded? If the Sink end notifies the Coordinator on the Source end, why not use the same one?

dijiekstra avatar May 10 '22 13:05 dijiekstra

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

dijiekstra avatar May 10 '22 13:05 dijiekstra

Anyway, looking forward to the new API . 🎉

dijiekstra avatar May 10 '22 13:05 dijiekstra

Is one Coordinator better than many? Because if an exception occurs at Sink and the Source does not know about it and continues to send data downstream, is the data discarded? If the Sink end notifies the Coordinator on the Source end, why not use the same one?

@lonelyGhostisdog The fault-tolerance between source and sink is supported by chandy-lamport algorithm (ie checkpoint).

ashulin avatar May 10 '22 14:05 ashulin

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.

ashulin avatar May 11 '22 03:05 ashulin

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.

Why don't we design a new engine that doesn't dependency on Spark or Flink

dijiekstra avatar May 11 '22 03:05 dijiekstra

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.

Why don't we design a new engine that doesn't dependency on Spark or Flink

Should we first complete the design and construction of Source, Sink and Transform? Even if is only stand-alone mode like datax. Refine fault tolerance, tables, Sql through gradual iteration?

dijiekstra avatar May 11 '22 03:05 dijiekstra

We do translation work for different engines, so if faced with inconsistent semantics or inconsistent functions how to solve the situation? For example, Spark and Flink both have Checkpoint, but their mechanisms and functions are completely different. If we provide Checkpoint capability externally, how can we unify Checkpoint in the two engines?

@lonelyGhostisdog Because the checkpoint mechanism is not completely consistent, we are still discussing how to adapt spark's micro-batch and continuous reader.

Why don't we design a new engine that doesn't dependency on Spark or Flink

The new engine is being designed. However, because a large number of SeaTunnel users are using Flink and spark, we hope to be compatible with Flink and spark engines as much as possible. In the future, if our own engine is good enough, we will discuss again whether to continue to rely on Flink and spark.

EricJoy2048 avatar May 11 '22 03:05 EricJoy2048

I have another question. If a Connector is not implemented in Spark or Flink, how will we implement this new Connector if we are based on a new engine? Do we still need to implement the corresponding Connector in Spark or Flink first?

dijiekstra avatar May 11 '22 03:05 dijiekstra

I have another question. If a Connector is not implemented in Spark or Flink, how will we implement this new Connector if we are based on a new engine? Do we still need to implement the corresponding Connector in Spark or Flink first?

I suggest the new engine can adaptation the unified API too. And then If the user develops a connector based on the unified tool, the connector will be able to run on spark, Flink and ST own engine.

EricJoy2048 avatar May 11 '22 03:05 EricJoy2048

Thank everyone for answering my questions, maybe I need to see the code to understand the design better.

dijiekstra avatar May 11 '22 05:05 dijiekstra

From the code, I only saw the abstract upper design about catalog,table,source,sink... and some logic of converting Seatunnel to Flink types. There is no distributed execution or network transmission or seatunnel Connector conversion to flink/ Spark connector ,etc. Is it not commited? Where can I find the corresponding designs?

dijiekstra avatar May 12 '22 14:05 dijiekstra

Or are we going to do a datax-like model for now?

dijiekstra avatar May 12 '22 14:05 dijiekstra

From the code, I only saw the abstract upper design about catalog,table,source,sink... and some logic of converting Seatunnel to Flink types. There is no distributed execution or network transmission or seatunnel Connector conversion to flink/ Spark connector ,etc. Is it not commited? Where can I find the corresponding designs?

Branch api-draft only contain connector api, not contain st-engine.

EricJoy2048 avatar May 16 '22 03:05 EricJoy2048

Or are we going to do a datax-like model for now?

A seatunnel-engine is being designed. It is designed to solve scenarios that Flink and spark cannot support in data synchronization scenarios. Link resource share, jdbc connector pool, ddl support .

In the scenario of offline synchronization, it is more like dataX, and it supports distributed deployment and execution. It also needs to support real-time synchronization.

We plan to discuss these in the mailing list after the preliminary design is completed.

EricJoy2048 avatar May 16 '22 03:05 EricJoy2048

Completed!

ashulin avatar Jul 28 '22 04:07 ashulin

Enable the SPI factory classes to improve the entire process.

ashulin avatar Aug 12 '22 02:08 ashulin

LGTM

dinggege1024 avatar Aug 19 '22 02:08 dinggege1024