flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-28639][Runtime/Checkpointing] Preserve consistency of events from subtask to OC

Open yunfengzhou-hub opened this issue 3 years ago • 10 comments

What is the purpose of the change

This PR is a continuation of #20275, further guaranteeing the consistency of operator events sent from subtasks to their coordinators. A coordinator would notify its subtasks to close their gateways before it starts doing checkpoints, and when the subtasks' gateways are closed, events sent from a subtask to the coordinator would be temporarily blocked and buffered until the subtask completes the checkpoint. The buffered events would be stored into snapshots in cases of fail-overs.

This PR is a component of FLINK-26029, which aims to generalize the checkpoint protocol of operator coordinators and makes coordinators applicable for non-source operators as well.

Brief change log

  • The coordinator would notify its subtasks to close their gateways when the coordinator starts checkpointing, and wait until ACKs are received from all subtasks before it actually starts doing the checkpoint.
  • A subtask would close its gateway when it receives the notification mentioned above from its coordinator, and send back ACK events.
  • A subtask would reopen its gateway when the subtask completes a snapshot.
  • Events being sent from a subtask while the subtask's gateway is closed will be temporarily blocked and be actually sent out when the gateway is reopened.
  • Blocked events are saved into and restored from snapshots.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests to verify the correctness of the closing/reopening behavior of subtask gateways.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)

  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)

  • The serializers: (no)

  • The runtime per-record code paths (performance sensitive): (yes)

    • This PR additionally blocks the communication between a coordinator and its subtasks from when checkpoint barriers are sent to sources to when subtasks complete the checkpoint. This means that an operator event generated during checkpoints would be delivered only after the target subtask completes the checkpoint, bringing longer latency to that event.
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)

    • This PR affects the latency of operator events from coordinators to subtasks, as described above.
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

yunfengzhou-hub avatar Aug 04 '22 11:08 yunfengzhou-hub

CI report:

  • 4f1487a214b87a95c6941a2ef90da46ebfd3f60f Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Aug 04 '22 11:08 flinkbot

@flinkbot run azure

yunfengzhou-hub avatar Aug 09 '22 09:08 yunfengzhou-hub

@flinkbot run azure

yunfengzhou-hub avatar Aug 09 '22 12:08 yunfengzhou-hub

@flinkbot run azure

yunfengzhou-hub avatar Aug 11 '22 09:08 yunfengzhou-hub

@flinkbot run azure

yunfengzhou-hub avatar Aug 12 '22 12:08 yunfengzhou-hub

@flinkbot run azure

yunfengzhou-hub avatar Aug 15 '22 02:08 yunfengzhou-hub

@flinkbot run azure

yunfengzhou-hub avatar Aug 15 '22 07:08 yunfengzhou-hub

@flinkbot run azure

yunfengzhou-hub avatar Aug 15 '22 09:08 yunfengzhou-hub

Thanks for the PR. LGTM.

@becketqin Do you want to take a look at this PR?

lindong28 avatar Aug 15 '22 12:08 lindong28

@flinkbot run azure

yunfengzhou-hub avatar Aug 16 '22 00:08 yunfengzhou-hub