flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37701][flink-runtime] Fix AdaptiveScheduler ignoring checkpoint states sizes for local recovery adjustment.

Open Izeren opened this issue 6 months ago • 11 comments

What is the purpose of the change

Address local recovery issues when Adaptive scheduler is enabled.

  1. Pass latest completed checkpoint in addition to execution graph to StateSizeEstimates (that is needed because execution graph goes through cancelling/cancelled state and checkpoint coordinator is nulled by the time we run calculations).
  2. Assign positive priority score to allocations that have overlapping key groups even when state size is zero (currently we would only give priority score if managedKeyedState is present, but local recovery semantics doesn't require state presence).

Context: When job can be recovered locally, we should keep slot allocation after restart to maintain

Verifying this change

LocalRecoveryITCase#testRecoverLocallyFromProcessCrashWithWorkingDirectory now passes when AdaptiveScheduler is enabled.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Izeren avatar Jun 10 '25 18:06 Izeren

CI report:

  • 533a6e6494bec2ade2e9dbeda8cf49f888fc5f16 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jun 10 '25 18:06 flinkbot

The ci is failed.

lsyldliu avatar Jun 11 '25 01:06 lsyldliu

@Izeren rebasing the PR to include https://github.com/apache/flink/commit/374fedbb4ef7dc139dc941d788a07cc8270de1d9 should fix the CI

dmvk avatar Jun 11 '25 07:06 dmvk

@Izeren rebasing the PR to include 374fedb should fix the CI

Thank you, will do

Izeren avatar Jun 11 '25 09:06 Izeren

Thanks for the fix @Izeren ! I've left a couple of comments, PTAL. Were you able to reproduce the failure reliably without fix - and success with the fix?

Yes, it fails in intellij more than it doesn't, but you need to provide VM options: -Dflink.tests.enable-adaptive-scheduler=true to ensure that adaptive scheduler is used. With the fix it is consistently successful.

Izeren avatar Jun 11 '25 09:06 Izeren

  1. We should add test case to AdaptiveScheduler with custom implementation of SlotAssigner that acts as a regression test
  2. We should add test case to SlotAssigner implementation that verifies how SA behaves in non-rescaling scenarios, when we simply want to reuse previously known allocations

dmvk avatar Jun 11 '25 12:06 dmvk

  • We should add test case to AdaptiveScheduler with custom implementation of SlotAssigner that acts as a regression test
  • We should add test case to SlotAssigner implementation that verifies how SA behaves in non-rescaling scenarios, when we simply want to reuse previously known allocations

@dmvk, I have added 2 tests:

  1. For AdaptiveScheduler to ensure that SlotAllocator receives data from the checkpoint and will use it for distribution. (I couldn't check the checkpoints themselves as it AllocationInformation calculated through static call, hence I verify that state made it into SlotAllocator after).
  2. For SlotAllocator to ensure that it preserves allocation according to state distribution and should retain allocation for a job restart.

Izeren avatar Jun 13 '25 15:06 Izeren

I am still looking into few other test failures in AdaptiveScheduler

Izeren avatar Jun 16 '25 08:06 Izeren

@Izeren Hi, can we push this fix before code freeze time?

lsyldliu avatar Jun 18 '25 01:06 lsyldliu

@dmvk, would you have time to have a second look today?

Izeren avatar Jun 18 '25 09:06 Izeren

I have added changes to the test as proposed by @dmvk

Izeren avatar Jun 19 '25 07:06 Izeren