[FLINK-37701][flink-runtime] Fix AdaptiveScheduler ignoring checkpoint states sizes for local recovery adjustment.
What is the purpose of the change
Address local recovery issues when Adaptive scheduler is enabled.
- Pass latest completed checkpoint in addition to execution graph to
StateSizeEstimates(that is needed because execution graph goes through cancelling/cancelled state and checkpoint coordinator isnulledby the time we run calculations). - Assign positive priority score to allocations that have overlapping key groups even when state size is zero (currently we would only give priority score if
managedKeyedStateis present, but local recovery semantics doesn'trequirestate presence).
Context: When job can be recovered locally, we should keep slot allocation after restart to maintain
Verifying this change
LocalRecoveryITCase#testRecoverLocallyFromProcessCrashWithWorkingDirectory now passes when AdaptiveScheduler is enabled.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (yes / no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- 533a6e6494bec2ade2e9dbeda8cf49f888fc5f16 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
The ci is failed.
@Izeren rebasing the PR to include https://github.com/apache/flink/commit/374fedbb4ef7dc139dc941d788a07cc8270de1d9 should fix the CI
Thanks for the fix @Izeren ! I've left a couple of comments, PTAL. Were you able to reproduce the failure reliably without fix - and success with the fix?
Yes, it fails in intellij more than it doesn't, but you need to provide VM options: -Dflink.tests.enable-adaptive-scheduler=true to ensure that adaptive scheduler is used. With the fix it is consistently successful.
- We should add test case to AdaptiveScheduler with custom implementation of SlotAssigner that acts as a regression test
- We should add test case to SlotAssigner implementation that verifies how SA behaves in non-rescaling scenarios, when we simply want to reuse previously known allocations
- We should add test case to AdaptiveScheduler with custom implementation of SlotAssigner that acts as a regression test
- We should add test case to SlotAssigner implementation that verifies how SA behaves in non-rescaling scenarios, when we simply want to reuse previously known allocations
@dmvk, I have added 2 tests:
- For
AdaptiveSchedulerto ensure thatSlotAllocatorreceives data from the checkpoint and will use it for distribution. (I couldn't check the checkpoints themselves as it AllocationInformation calculated through static call, hence I verify that state made it into SlotAllocator after). - For
SlotAllocatorto ensure that it preserves allocation according to state distribution and should retain allocation for a job restart.
I am still looking into few other test failures in AdaptiveScheduler
@Izeren Hi, can we push this fix before code freeze time?
@dmvk, would you have time to have a second look today?
I have added changes to the test as proposed by @dmvk