flink
flink copied to clipboard
[FLINK-34427][runtime] Adds state check to requirementsCheck logic
What is the purpose of the change
Quoting @zentol from FLINK-34427 here:
The problem is the use of scheduled executors in the FineGrainedSlotManager. It periodically tries to schedule actions unconditionally into the main thread, and this periodic action is also never cancelled. If the rpc endpoint shuts down during the periodic delay the scheduled action can fire again before the rpc service (and thus scheduled executor) is shut down, running into this error.
This code is plain broken as tt makes assumptions about the lifecycle of the scheduled executor. The loop should be canceled when the FGSM is shut down, and as a safety rail any scheduled action should validate that the FGSM is not shut down yet before scheduling anything into the main thread.
Brief change log
- Makes
ManuallyTriggeredScheduledExecutorService
more robust against exceptions - Removes started field and utilizes mainThreadExecutor instead
- Adds main thread assertion in FineGrainedSlotManager
- Refactors code to allow state check in scheduled tasks
Verifying this change
- One test per scheduled task was added to
FineGrainedSlotManagerTest
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
CI report:
- 7003fd89fc582a6fbfcd39cf7b9b0ab9c1604ae5 Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
I updated the PR but will wait till tomorrow with switching it from draft back to reviewable state. I want to wait for CI to pass before it makes sense to review the change.
I looked into the missing synchronization for the FineGrainedSlotManager#started
field: AFAIU, we're relying on all the methods which touch #started
to run in the same thread (which is the ResourceManager
's main thread). But as far as I can see, there's nothing documenting this assumption.
I'd like improve the code in this regards to make this relationship clearer to code readers as part of FLINK-34427. I see two options:
- Make the implementation more robust in a sense that it's requiring certain methods to be executed in the passed (i.e. RM) main thread.
- Documenting that the
FineGrainedSlotManager
is not thread-safe and is required to run in a single thread to allow sequential state transitioning in the JavaDoc.
In the current version of this PR, I went for the first option. Two test failures popped in the first CI run. ResourceManagerTest.testDisconnectTaskManager
failed due to the main-thread change. The ActiveResourceManager test failure seems to be unrelated. I created FLINK-34447 for that one.
Most-recent force push was a rebase to include fixes on master.