flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37607] Fix RpcEndpoint#MainThreadExecutor lost scheduling tasks when not running

Open Look-Y-Y opened this issue 1 month ago • 4 comments

What is the purpose of the change

The pull request is to resolve the issue of task loss occurring before the RPC server starts when using org.apache.flink.runtime.rpc.RpcEndpoint.MainThreadExecutor#schedule.

Brief change log

  • Add running future to indicate whether the RPC endpoint is started
  • MainThreadExecutor schedule tasks after the runing future is completed

Verifying this change

  • Added org.apache.flink.runtime.rpc.RpcEndpointTest#testScheduleTaskAfterStart to verify it

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

Look-Y-Y avatar Nov 09 '25 12:11 Look-Y-Y

CI report:

  • d1150c52ffd6b74f06b741083ccb056fc8928c07 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Nov 09 '25 12:11 flinkbot

Thanks for your fix @Look-Y-Y . I have some suggestions: introducing getRunningFuture may make the semantic of the delay parameter in schedule unclear—it will become “delay for some time after running” instead of “delay the time from now until execution”. This could break some by-design behaviors.

I’d prefer either adding a start() function in DefaultBlocklistHandler#scheduleTimeoutCheck so that it’s invoked after the Endpoint starts, or introducing an isRunning() method to the gateway, so that we can log a warning when it’s not running.

Thank for your suggestion @noorall . I plan to make the following modifications:

  1. Add a start() interface method to BlocklistHandler, and move the mainThreadExecutor field from the constructor to the start() method to ensure all operations are performed after start().

  2. Call the BlocklistHandler#start() method after JobMaster/ResourceManager starts the RPC endpoint.

  3. Call the RpcEndpoint#isRunning() method in MainThreadExecutor. If the RPC is not running, log a warning.

Look-Y-Y avatar Nov 10 '25 14:11 Look-Y-Y

Thanks for your fix @Look-Y-Y . I have some suggestions: introducing getRunningFuture may make the semantic of the delay parameter in schedule unclear—it will become “delay for some time after running” instead of “delay the time from now until execution”. This could break some by-design behaviors. I’d prefer either adding a start() function in DefaultBlocklistHandler#scheduleTimeoutCheck so that it’s invoked after the Endpoint starts, or introducing an isRunning() method to the gateway, so that we can log a warning when it’s not running.

Thank for your suggestion @noorall . I plan to make the following modifications:

  1. Add a start() interface method to BlocklistHandler, and move the mainThreadExecutor field from the constructor to the start() method to ensure all operations are performed after start().
  2. Call the BlocklistHandler#start() method after JobMaster/ResourceManager starts the RPC endpoint.
  3. Call the RpcEndpoint#isRunning() method in MainThreadExecutor. If the RPC is not running, log a warning.

Hi @Look-Y-Y , thanks again for working on this PR. Do you have any updates recently?

noorall avatar Nov 21 '25 07:11 noorall

Thanks for your fix @Look-Y-Y . I have some suggestions: introducing getRunningFuture may make the semantic of the delay parameter in schedule unclear—it will become “delay for some time after running” instead of “delay the time from now until execution”. This could break some by-design behaviors. I’d prefer either adding a start() function in DefaultBlocklistHandler#scheduleTimeoutCheck so that it’s invoked after the Endpoint starts, or introducing an isRunning() method to the gateway, so that we can log a warning when it’s not running.

Thank for your suggestion @noorall . I plan to make the following modifications:

  1. Add a start() interface method to BlocklistHandler, and move the mainThreadExecutor field from the constructor to the start() method to ensure all operations are performed after start().
  2. Call the BlocklistHandler#start() method after JobMaster/ResourceManager starts the RPC endpoint.
  3. Call the RpcEndpoint#isRunning() method in MainThreadExecutor. If the RPC is not running, log a warning.

Hi @Look-Y-Y , thanks again for working on this PR. Do you have any updates recently?

Hello @noorall I have updated this PR. Points 1 and 2 above have been completed. Regarding point 3, since the scheduling method is not called by the main thread, not log RPC not running.

Look-Y-Y avatar Dec 08 '25 16:12 Look-Y-Y