[FLINK-5601][Checkpointing] Watermark checkpointing
What is the purpose of the change
This pull request uses ListState in TimestampsAndPeriodicWatermarksOperator.java and TimestampsAndPunctuatedWatermarksOperator.java to checkpoint watermark. During recovering process, it uses union state to find the lowest watermark and emit it in the end of open function.
Brief change log
- Add ListState to store watermark in TimestampsAndPeriodicWatermarksOperator.java and TimestampsAndPunctuatedWatermarksOperator.java.
Verifying this change
- Unit testing in WatermarkCheckpointingITCase.java.
- TimestampsAndPeriodicWatermarksOperator testing: Send five "Five", then checkpoint, then make the job fail, then recover the job, and send five "Three", the sum should be 25.
- TimestampsAndPunctuatedWatermarksOperator testing: Similar testing logic machenism as above.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
Related test-failure:
Tests run: 12, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 10.549 sec <<< FAILURE! - in org.apache.flink.test.streaming.runtime.TimestampITCase
testTimestampExtractorWithAutoInterval(org.apache.flink.test.streaming.runtime.TimestampITCase) Time elapsed: 0.093 sec <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at org.apache.flink.test.streaming.runtime.TimestampITCase.testTimestampExtractorWithAutoInterval(TimestampITCase.java:346)
Caused by: java.lang.NullPointerException: null
at org.apache.flink.test.streaming.runtime.TimestampITCase$CustomOperator.processWatermark(TimestampITCase.java:713)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:500)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:702)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.initializeState(TimestampsAndPeriodicWatermarksOperator.java:137)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
testTimestampExtractorWithDecreasingCustomWatermarkEmit(org.apache.flink.test.streaming.runtime.TimestampITCase) Time elapsed: 0.092 sec <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at org.apache.flink.test.streaming.runtime.TimestampITCase.testTimestampExtractorWithDecreasingCustomWatermarkEmit(TimestampITCase.java:469)
Caused by: java.lang.NullPointerException: null
at org.apache.flink.test.streaming.runtime.TimestampITCase$CustomOperator.processWatermark(TimestampITCase.java:713)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:500)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:702)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.initializeState(TimestampsAndPunctuatedWatermarksOperator.java:115)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
testTimestampExtractorWithCustomWatermarkEmit(org.apache.flink.test.streaming.runtime.TimestampITCase) Time elapsed: 0.069 sec <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at org.apache.flink.test.streaming.runtime.TimestampITCase.testTimestampExtractorWithCustomWatermarkEmit(TimestampITCase.java:408)
Caused by: java.lang.NullPointerException: null
at org.apache.flink.test.streaming.runtime.TimestampITCase$CustomOperator.processWatermark(TimestampITCase.java:713)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:500)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:702)
at org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.initializeState(TimestampsAndPunctuatedWatermarksOperator.java:115)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
@zentol Thanks for pointing it out. It's fixed now.
CI report:
- f93c0c3ba82f53db498c46f22a4fb3bf9a730451 Unknown: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Hi, @Jiayi-Liao I think it's useful. So just kindly ping. Are you still working on this ?
Hi, I’m curious why this pull request hasn’t been merged yet. Could anyone provide context?
This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.