seatunnel
seatunnel copied to clipboard
[ST-Engine][Checkpoint] The design of Checkpoint
Search before asking
- [X] I had searched in the feature and found no similar feature requirement.
SeaTunnel Checkpoint Proposal
Backgroud & Motivation
I think the [Chandy-Lamport algorithm](https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm) can be used to implement distributed state snapshots (hereinafter referred to as Checkpoint);
Based on the current Source & Sink V2 API of SeaTunnel, the required features are:
-
Checkpoint supports both real-time and offline operations;
- When the job restored, only the Subtask (Vertex of the execution graph) that has not been completed is restored;
-
Jobs can be restored normally when the user changes the parallelism of the connector;
We found that the Checkpoint implemented by Flink and Spark has a large scope of impact: When a job has multiple tables, a table failure will affect the entire job; So we expect to be able to improve based on this problem:
- Minimize the unit of Checkpoint execution and reduce the impact of restore;
Overall Design
-
Convert the DAG to execution graph, and use [Topological Sorting](https://en.wikipedia.org/wiki/Topological_sorting) to identify the execution graph as n (n >= 1) pipelines;
-
CheckpointCoordinator creates a separate CheckpointTask for each Pipeline;
- CheckpointTask will manage its own associated pipeline, and the restore unit is pipeline;
-
Jobs can be restored normally when the user changes the parallelism of the connector;
-
For the completed subtask, Checkpoint will be performed normally, and the State will be retained (Apache SeaTunel will not have a super large state, such as Join);
- During the Restore phase, the Completed Subtask will not be deployed;
The Execution Unit (Pipeline)
Case: Two Kafka tables are written to the corresponding HDFS.
Job DAG:
Execution Graph:
By topological sorting, we can get
{Enumerator#1, Reader#1#1, Reader#1#2, Writer#1#1, Writer#1#2, AggregatedCommitter#1},
{Enumerator#2, Reader#2#1, Writer#2#1, AggregatedCommitter#2}
two queues, namely two Pipelines;
Support for Completed Subtask
Case: Two MySQL instances are sub-database and sub-table, which are synchronized to other storage after aggregation;
We assume that the amount of data of instance Source#2 is smaller, and enter the Completed Status first;
At this time, if the pipeline fails/restarts-manually, the Subtask of Source#2 can not be deployed in the Restore phase to reduce resource usage;
Support for parallelism changes
Plan A: Keyed State Redistribute[currently expected]
In the checkpoint execution phase, the ID of the subtask is used as the state key and saved to the state backend;
In the checkpoint restore phase, use subtaskId = key % parallelism
to calculate the restored subtask ID;
SinkState
For the state of sink, this satisfies the requirements;
SourceState
For the state of the source, the subtask of the source still needs to continue processing;
The following is the job of SourceTask, which is not managed by Checkpoint;
We assume that the source is kafka, the task has 1 topic and 6 partitions, and the parallelism is changed from 2 to 3. After restored, the following figure is shown.
We found that splits cannot be assigned to new readers;
In order for the new reader to be assigned to split, we need the ReaderTask to use SourceSplitEnumerator#addSplitsBack
to return the split to the enumerator;
The split is reassigned by the enumerator, and the restore phase is completed;
Plan B: Uniform state redistribute
Deprecated Plan
Enumerator and readers of special processing source:
In the checkpoint execution phase, the response barrier is the same as that of a normal task,
During the Restore phase, the state of the reader will be restored to the enumerator;
Use the SourceSplitEnumerator#addSplitsBack
method to restore the split state, and then the enumerator can assign the split to the reader whose parallelism has been changed.
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct

Can it be like this?

![]()
Can it be like this?
![]()
No, it's different. The above writer has multiple input subtasks, which is to reduce the resource usage on the sink side;
Can it be like this?
![]()
No, it's different. The above writer has multiple input subtasks, which is to reduce the resource usage on the sink side;
From this design https://github.com/apache/incubator-seatunnel/issues/2261 we can know. Now, we only support Source
and PartitionTransform
operator to set parallelism, The parallelism of other connectors is equal to the sum of the parallelism of its upstream operators. So, in this case the parallelism of the sink operator is 3(One source have 2 parallelism and another one have 1 parallelism) not 2. Then, we have 3 source subtask and 3 sink subtask, I think it can replace to tow pipeline.
Or, do we have any other more suitable design for parallelism?
@EricJoy2048 Now how do we deal with the union of multiple upstream operators? That's the same thing as the following
Users need
union (all)
post-processing, and we cannot separate them
@EricJoy2048 Now how do we deal with the union of multiple upstream operators? That's the same thing as the following
Users need
union (all)
post-processing, and we cannot separate them
@Hisoka-X
@EricJoy2048 Now how do we deal with the union of multiple upstream operators? That's the same thing as the following
Users need
union (all)
post-processing, and we cannot separate them
We should create a special transform union
. It can union all upstream data. Then union
will have parallelism parameter, take this picture as an example, if parallelism = 8 (4+4), just send data to next action, if parallelism != 8, we should use queue( like Partition Transform) to change parallelism.