spark
spark copied to clipboard
[SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
What changes were proposed in this pull request?
This PR aims to provide a method to lower the timeout. Our solution is to ask master for worker’s heartbeat when Driver does not receive heartbeat from executor for TimeoutThreshold
seconds.
When Executor performs full GC, it cannot send any message during full GC. Next, Driver cannot receive heartbeat from Executor. Instead of removing the executor directly, driver will ask master for workerLastHeartbeat
. Driver will determine whether it is network disconnection or other issues (e.g. GC) based on workerLastHeartbeat
. If it is network disconnection, we will remove the executor. Otherwise, we will put the executor into executorExpiryCandidates
rather than expiring it immediately.
[Note 1]
Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) * 1000 / 4)
milliseconds. Check deploy/worker/Worker.scala for more details. It is good to keep expiryCandidatesTimeout
larger than (conf.get(WORKER_TIMEOUT) * 1000 / 4)
to know whether master lost any heartbeat from the worker or not.
[Note 2] We can also use this method for other deployment methods, but not every deployment method schedules driver on master.
- Result
- Milestone 4:
-
HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT
: true -
NETWORK_EXECUTOR_TIMEOUT
(TimeoutThreshold in the figures): 60s -
HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT
: 30s -
NETWORK_TIMEOUT_INTERVAL
: 15s
-
- w/o Milestone 4
-
HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT
: false -
NETWORK_EXECUTOR_TIMEOUT
(TimeoutThreshold in the figures): 120s -
NETWORK_TIMEOUT_INTERVAL
: 60s
-
- Milestone 4:
Why are the changes needed?
Currently, the driver’s HeartbeatReceiver will expire an executor if it does not receive any heartbeat from the executor for 120 seconds. However, 120 seconds is too long, but we will face other challenges when we try to lower the timeout threshold. To elaborate, when an executor is performing GC, it cannot reply any message.
We will use the above figure to explain why we cannot lower TimeoutThreshold
to 60 seconds directly. When Executor performs full GC, it cannot send any message, including heartbeat. Next, driver will remove the executor because driver cannot receive heartbeat from Executor for 60 seconds. In other words, we cannot distinguish between GC and network disconnection.
Does this PR introduce any user-facing change?
No
How was this patch tested?
build/sbt "core/testOnly *HeartbeatReceiverSuite"
build/sbt "core/testOnly *MasterSuite"
cc. @Ngone51 @jiangxb1987
Can one of the admins verify this patch?
Thank @mridulm for your recommendations! I will resolve these comments as soon as possible.
Btw, any thoughts on this ?
Are the changes here necessarily only for standalone ? Why not k8s and yarn ?
The changes are specific to standalone - but the idea should be transferable to others as well, right ? Do we want to tackle that here ?
Btw, any thoughts on this ?
Are the changes here necessarily only for standalone ? Why not k8s and yarn ?
The changes are specific to standalone - but the idea should be transferable to others as well, right ? Do we want to tackle that here ?
I can take YARN and k8s support as follow-ups of this PR. In my opinion, the scope of this PR will become too large if we tackle YARN and k8s here. Thank you!
Sounds good @kevin85421, wanted to make sure the approach is extensible to others. Can you please file follow up jira's for both ? Thx
Hi @mridulm, here are the JIRA tickets. Thank you!
YARN: https://issues.apache.org/jira/browse/SPARK-40068 k8s: https://issues.apache.org/jira/browse/SPARK-40069
Gentle ping @Ngone51 @mridulm
I am fine with the changes, will let @Ngone51 take a look/merge
https://github.com/apache/spark/pull/37411/commits/92629e30410d7ae9741457240c3f1a789f6b042b
Default values
I have revisited the configurations in this PR and updated their default values after the discussion with @Ngone51.
(1) initial delay: set initial delay to executorTimeoutMs
(default: 30s)
override def onStart(): Unit = {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
executorTimeoutMs, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
}
(2) checkTimeoutIntervalMs
: the thread in (1) will execute the function expireDeadHosts
every checkTimeoutIntervalMs
(default: 15s).
private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
(3) executorTimeoutMs
: the default value of STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
is optional, and thus the default value is equal to NETWORK_EXECUTOR_TIMEOUT
(default: 30s).
private val executorTimeoutMs = sc.conf.get(
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
).getOrElse(
sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT)
)
(4) expiryCandidatesTimeout
: if checkWorkerLastHeartbeat
is true, the value will be HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT
(default: 90s).
private lazy val expiryCandidatesTimeout = checkWorkerLastHeartbeat match {
case true =>
logWarning(s"Worker heartbeat check is enabled. It only works normally when" +
s"${config.HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT.key} is larger than worker's" +
s"heartbeat interval.")
sc.conf.get(config.HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT)
case false => 0
}
Why do we select these values?
These are some equations that the configurations should respect.
(1) executorTimeoutMs
(30s) + expiryCandidatesTimeout
(90s) == executorTimeoutMs
(without this PR, 120s)
- Try to keep the same behaviors.
(2) executorHeartbeatInterval
: executor will send a heartbeat to driver every executorHeartbeatInterval
(default: 10s) seconds.
-
executorHeartbeatInterval
(10s) <=executorTimeoutMs
(30s) -
executorHeartbeatInterval
(10s) <=expiryCandidatesTimeout
(90s)
(3) initial delay (30s) >= executorTimeoutMs
(30s)
- If initial delay is smaller than
executorTimeoutMs
, the first function call ofexpireDeadHosts
is impossible to expire any executor. In addition, initial delay also provides the scheduler backend an interval to initialize before the lazy variable evaluations.
(4) checkTimeoutIntervalMs <= executorTimeoutMs
: this is an existing assertion in HeartbeatReceiver.scala.
require(checkTimeoutIntervalMs <= executorTimeoutMs,
s"${Network.NETWORK_TIMEOUT_INTERVAL.key} should be less than or " +
s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}.")
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!