flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-34109][connectors] FileSystem sink connector restore job from historical checkpoint bugfix

Open ParyshevSergey opened this issue 1 year ago • 3 comments

What is the purpose of the change

Bugfix of FileSystem connector sink with compaction setting is enabled can't restore job from historical checkpoint (when MAX_RETAINED_CHECKPOINTS > 1 and restroing checkpoint is not last)

Brief change log

Added check - if operator implements InternalCheckpointListener then call subsume on it, and CompactOperator can acquire subsume for remove old uncompacted files.

Verifying this change

org.apache.flink.connector.file.table.stream.compact.CompactOperatorTest#testCompactOperator org.apache.flink.connector.file.table.stream.compact.CompactOperatorTest#testCompactOperatorOnHistoricalCheckpoint

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: possible

Documentation

  • Does this pull request introduce a new feature? no

ParyshevSergey avatar Jan 17 '24 06:01 ParyshevSergey

CI report:

  • ef3ff653b6f1049459f4523d7034ee0add27e3e7 Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jan 17 '24 07:01 flinkbot

@flinkbot run azure

ParyshevSergey avatar Jan 19 '24 06:01 ParyshevSergey

According azure results I found out that internal classes can't be used in connectors, so I can offer 2 ways:

  1. Add method "checkpointSubsume" into CheckpointListener
  2. Add into checkpoint config field "maxRetainCheckpoints" and clean files by call clearExpiredFiles(checkpointId-maxRetainCheckpoint)

ParyshevSergey avatar Feb 15 '24 08:02 ParyshevSergey