[FLINK-37607] Fix RpcEndpoint#MainThreadExecutor lost scheduling tasks when not running
What is the purpose of the change
The pull request is to resolve the issue of task loss occurring before the RPC server starts when using org.apache.flink.runtime.rpc.RpcEndpoint.MainThreadExecutor#schedule.
Brief change log
- Add running future to indicate whether the RPC endpoint is started
- MainThreadExecutor schedule tasks after the runing future is completed
Verifying this change
- Added org.apache.flink.runtime.rpc.RpcEndpointTest#testScheduleTaskAfterStart to verify it
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (no) - The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: no)
Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
CI report:
- d1150c52ffd6b74f06b741083ccb056fc8928c07 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
Thanks for your fix @Look-Y-Y . I have some suggestions: introducing
getRunningFuturemay make the semantic of thedelayparameter inscheduleunclear—it will become “delay for some time after running” instead of “delay the time from now until execution”. This could break some by-design behaviors.I’d prefer either adding a
start()function inDefaultBlocklistHandler#scheduleTimeoutCheckso that it’s invoked after the Endpoint starts, or introducing anisRunning()method to the gateway, so that we can log a warning when it’s not running.
Thank for your suggestion @noorall . I plan to make the following modifications:
-
Add a
start()interface method toBlocklistHandler, and move themainThreadExecutorfield from the constructor to thestart()method to ensure all operations are performed afterstart(). -
Call the
BlocklistHandler#start()method afterJobMaster/ResourceManagerstarts the RPC endpoint. -
Call the
RpcEndpoint#isRunning()method inMainThreadExecutor. If the RPC is not running, log a warning.
Thanks for your fix @Look-Y-Y . I have some suggestions: introducing
getRunningFuturemay make the semantic of thedelayparameter inscheduleunclear—it will become “delay for some time after running” instead of “delay the time from now until execution”. This could break some by-design behaviors. I’d prefer either adding astart()function inDefaultBlocklistHandler#scheduleTimeoutCheckso that it’s invoked after the Endpoint starts, or introducing anisRunning()method to the gateway, so that we can log a warning when it’s not running.Thank for your suggestion @noorall . I plan to make the following modifications:
- Add a
start()interface method toBlocklistHandler, and move themainThreadExecutorfield from the constructor to thestart()method to ensure all operations are performed afterstart().- Call the
BlocklistHandler#start()method afterJobMaster/ResourceManagerstarts the RPC endpoint.- Call the
RpcEndpoint#isRunning()method inMainThreadExecutor. If the RPC is not running, log a warning.
Hi @Look-Y-Y , thanks again for working on this PR. Do you have any updates recently?
Thanks for your fix @Look-Y-Y . I have some suggestions: introducing
getRunningFuturemay make the semantic of thedelayparameter inscheduleunclear—it will become “delay for some time after running” instead of “delay the time from now until execution”. This could break some by-design behaviors. I’d prefer either adding astart()function inDefaultBlocklistHandler#scheduleTimeoutCheckso that it’s invoked after the Endpoint starts, or introducing anisRunning()method to the gateway, so that we can log a warning when it’s not running.Thank for your suggestion @noorall . I plan to make the following modifications:
- Add a
start()interface method toBlocklistHandler, and move themainThreadExecutorfield from the constructor to thestart()method to ensure all operations are performed afterstart().- Call the
BlocklistHandler#start()method afterJobMaster/ResourceManagerstarts the RPC endpoint.- Call the
RpcEndpoint#isRunning()method inMainThreadExecutor. If the RPC is not running, log a warning.Hi @Look-Y-Y , thanks again for working on this PR. Do you have any updates recently?
Hello @noorall I have updated this PR. Points 1 and 2 above have been completed. Regarding point 3, since the scheduling method is not called by the main thread, not log RPC not running.