seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[ST-Engine][Checkpoint] The design of Checkpoint

Open ashulin opened this issue 2 years ago • 6 comments

Search before asking

  • [X] I had searched in the feature and found no similar feature requirement.

SeaTunnel Checkpoint Proposal

Backgroud & Motivation

I think the [Chandy-Lamport algorithm](https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm) can be used to implement distributed state snapshots (hereinafter referred to as Checkpoint);

Based on the current Source & Sink V2 API of SeaTunnel, the required features are:

  1. Checkpoint supports both real-time and offline operations;

    • When the job restored, only the Subtask (Vertex of the execution graph) that has not been completed is restored;
  2. Jobs can be restored normally when the user changes the parallelism of the connector;

We found that the Checkpoint implemented by Flink and Spark has a large scope of impact: When a job has multiple tables, a table failure will affect the entire job; So we expect to be able to improve based on this problem:

  1. Minimize the unit of Checkpoint execution and reduce the impact of restore;

Overall Design

SeaTunnel Snapshot

  1. Convert the DAG to execution graph, and use [Topological Sorting](https://en.wikipedia.org/wiki/Topological_sorting) to identify the execution graph as n (n >= 1) pipelines;

  2. CheckpointCoordinator creates a separate CheckpointTask for each Pipeline;

    • CheckpointTask will manage its own associated pipeline, and the restore unit is pipeline;
  3. Jobs can be restored normally when the user changes the parallelism of the connector;

  4. For the completed subtask, Checkpoint will be performed normally, and the State will be retained (Apache SeaTunel will not have a super large state, such as Join);

    • During the Restore phase, the Completed Subtask will not be deployed;

The Execution Unit (Pipeline)

Case: Two Kafka tables are written to the corresponding HDFS.

Job DAG:

SeaTunnel Snapshot-DAG

Execution Graph:

SeaTunnel Snapshot-ExecutionPlan

By topological sorting, we can get

{Enumerator#1, Reader#1#1, Reader#1#2, Writer#1#1, Writer#1#2, AggregatedCommitter#1},

{Enumerator#2, Reader#2#1, Writer#2#1, AggregatedCommitter#2}

two queues, namely two Pipelines;

SeaTunnel Snapshot-Pipeline

Support for Completed Subtask

Case: Two MySQL instances are sub-database and sub-table, which are synchronized to other storage after aggregation;

SeaTunnel Snapshot-Finished1

We assume that the amount of data of instance Source#2 is smaller, and enter the Completed Status first;

SeaTunnel Snapshot-Finished2

At this time, if the pipeline fails/restarts-manually, the Subtask of Source#2 can not be deployed in the Restore phase to reduce resource usage;

SeaTunnel Snapshot-Finished3

Support for parallelism changes

Plan A: Keyed State Redistribute[currently expected]

Checkpoint Source Plan-B 2

In the checkpoint execution phase, the ID of the subtask is used as the state key and saved to the state backend;

In the checkpoint restore phase, use subtaskId = key % parallelism to calculate the restored subtask ID;

SinkState

For the state of sink, this satisfies the requirements;

SourceState

For the state of the source, the subtask of the source still needs to continue processing;

The following is the job of SourceTask, which is not managed by Checkpoint;

We assume that the source is kafka, the task has 1 topic and 6 partitions, and the parallelism is changed from 2 to 3. After restored, the following figure is shown.

SourceState-1

We found that splits cannot be assigned to new readers; In order for the new reader to be assigned to split, we need the ReaderTask to use SourceSplitEnumerator#addSplitsBack to return the split to the enumerator;

SourceState-2

The split is reassigned by the enumerator, and the restore phase is completed;

SourceState-3

Plan B: Uniform state redistribute

Checkpoint Source Plan-B 1

Deprecated Plan

Enumerator and readers of special processing source:

Checkpoint Source Plan-A 1

In the checkpoint execution phase, the response barrier is the same as that of a normal task,

Checkpoint Source Plan-A 2

During the Restore phase, the state of the reader will be restored to the enumerator;

Use the SourceSplitEnumerator#addSplitsBack method to restore the split state, and then the enumerator can assign the split to the reader whose parallelism has been changed.

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

ashulin avatar Jul 27 '22 00:07 ashulin

image

Can it be like this?

image

EricJoy2048 avatar Jul 27 '22 02:07 EricJoy2048

image

Can it be like this?

image

No, it's different. The above writer has multiple input subtasks, which is to reduce the resource usage on the sink side;

ashulin avatar Jul 27 '22 07:07 ashulin

image Can it be like this? image

No, it's different. The above writer has multiple input subtasks, which is to reduce the resource usage on the sink side;

From this design https://github.com/apache/incubator-seatunnel/issues/2261 we can know. Now, we only support Source and PartitionTransform operator to set parallelism, The parallelism of other connectors is equal to the sum of the parallelism of its upstream operators. So, in this case the parallelism of the sink operator is 3(One source have 2 parallelism and another one have 1 parallelism) not 2. Then, we have 3 source subtask and 3 sink subtask, I think it can replace to tow pipeline.

Or, do we have any other more suitable design for parallelism?

EricJoy2048 avatar Jul 28 '22 07:07 EricJoy2048

@EricJoy2048 Now how do we deal with the union of multiple upstream operators? That's the same thing as the following image Users need union (all) post-processing, and we cannot separate them

ashulin avatar Jul 28 '22 07:07 ashulin

@EricJoy2048 Now how do we deal with the union of multiple upstream operators? That's the same thing as the following image Users need union (all) post-processing, and we cannot separate them

@Hisoka-X

EricJoy2048 avatar Jul 28 '22 09:07 EricJoy2048

@EricJoy2048 Now how do we deal with the union of multiple upstream operators? That's the same thing as the following image Users need union (all) post-processing, and we cannot separate them

We should create a special transform union. It can union all upstream data. Then union will have parallelism parameter, take this picture as an example, if parallelism = 8 (4+4), just send data to next action, if parallelism != 8, we should use queue( like Partition Transform) to change parallelism.

Hisoka-X avatar Jul 28 '22 10:07 Hisoka-X