spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

Open kevin85421 opened this issue 2 years ago • 10 comments

What changes were proposed in this pull request?

This PR aims to provide a method to lower the timeout. Our solution is to ask master for worker’s heartbeat when Driver does not receive heartbeat from executor for TimeoutThreshold seconds.

Screen Shot 2022-08-01 at 6 10 56 PM

When Executor performs full GC, it cannot send any message during full GC. Next, Driver cannot receive heartbeat from Executor. Instead of removing the executor directly, driver will ask master for workerLastHeartbeat. Driver will determine whether it is network disconnection or other issues (e.g. GC) based on workerLastHeartbeat. If it is network disconnection, we will remove the executor. Otherwise, we will put the executor into executorExpiryCandidates rather than expiring it immediately.

[Note 1] Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) * 1000 / 4) milliseconds. Check deploy/worker/Worker.scala for more details. It is good to keep expiryCandidatesTimeout larger than (conf.get(WORKER_TIMEOUT) * 1000 / 4) to know whether master lost any heartbeat from the worker or not.

[Note 2] We can also use this method for other deployment methods, but not every deployment method schedules driver on master.

  • Result
    • Milestone 4:
      • HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT: true
      • NETWORK_EXECUTOR_TIMEOUT (TimeoutThreshold in the figures): 60s
      • HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT: 30s
      • NETWORK_TIMEOUT_INTERVAL: 15s
    • w/o Milestone 4
      • HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT: false
      • NETWORK_EXECUTOR_TIMEOUT (TimeoutThreshold in the figures): 120s
      • NETWORK_TIMEOUT_INTERVAL: 60s Screen Shot 2022-08-01 at 6 54 51 PM

Why are the changes needed?

Currently, the driver’s HeartbeatReceiver will expire an executor if it does not receive any heartbeat from the executor for 120 seconds. However, 120 seconds is too long, but we will face other challenges when we try to lower the timeout threshold. To elaborate, when an executor is performing GC, it cannot reply any message.

Screen Shot 2022-08-01 at 6 03 50 PM

We will use the above figure to explain why we cannot lower TimeoutThreshold to 60 seconds directly. When Executor performs full GC, it cannot send any message, including heartbeat. Next, driver will remove the executor because driver cannot receive heartbeat from Executor for 60 seconds. In other words, we cannot distinguish between GC and network disconnection.

Does this PR introduce any user-facing change?

No

How was this patch tested?

build/sbt "core/testOnly *HeartbeatReceiverSuite"
build/sbt "core/testOnly *MasterSuite"

kevin85421 avatar Aug 05 '22 00:08 kevin85421

cc. @Ngone51 @jiangxb1987

kevin85421 avatar Aug 05 '22 00:08 kevin85421

Can one of the admins verify this patch?

AmplabJenkins avatar Aug 06 '22 01:08 AmplabJenkins

Thank @mridulm for your recommendations! I will resolve these comments as soon as possible.

kevin85421 avatar Aug 06 '22 07:08 kevin85421

Btw, any thoughts on this ?

Are the changes here necessarily only for standalone ? Why not k8s and yarn ?

The changes are specific to standalone - but the idea should be transferable to others as well, right ? Do we want to tackle that here ?

mridulm avatar Aug 12 '22 18:08 mridulm

Btw, any thoughts on this ?

Are the changes here necessarily only for standalone ? Why not k8s and yarn ?

The changes are specific to standalone - but the idea should be transferable to others as well, right ? Do we want to tackle that here ?

I can take YARN and k8s support as follow-ups of this PR. In my opinion, the scope of this PR will become too large if we tackle YARN and k8s here. Thank you!

kevin85421 avatar Aug 12 '22 21:08 kevin85421

Sounds good @kevin85421, wanted to make sure the approach is extensible to others. Can you please file follow up jira's for both ? Thx

mridulm avatar Aug 13 '22 04:08 mridulm

Hi @mridulm, here are the JIRA tickets. Thank you!

YARN: https://issues.apache.org/jira/browse/SPARK-40068 k8s: https://issues.apache.org/jira/browse/SPARK-40069

kevin85421 avatar Aug 13 '22 04:08 kevin85421

Gentle ping @Ngone51 @mridulm

kevin85421 avatar Aug 23 '22 05:08 kevin85421

I am fine with the changes, will let @Ngone51 take a look/merge

mridulm avatar Aug 24 '22 06:08 mridulm

https://github.com/apache/spark/pull/37411/commits/92629e30410d7ae9741457240c3f1a789f6b042b

Default values

I have revisited the configurations in this PR and updated their default values after the discussion with @Ngone51.

(1) initial delay: set initial delay to executorTimeoutMs (default: 30s)

override def onStart(): Unit = {
  timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
    () => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
    executorTimeoutMs, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
}

(2) checkTimeoutIntervalMs: the thread in (1) will execute the function expireDeadHosts every checkTimeoutIntervalMs (default: 15s).

private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)

(3) executorTimeoutMs: the default value of STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT is optional, and thus the default value is equal to NETWORK_EXECUTOR_TIMEOUT (default: 30s).

private val executorTimeoutMs = sc.conf.get(
  config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
).getOrElse(
  sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT)
)

(4) expiryCandidatesTimeout: if checkWorkerLastHeartbeat is true, the value will be HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT (default: 90s).

private lazy val expiryCandidatesTimeout = checkWorkerLastHeartbeat match {
  case true =>
    logWarning(s"Worker heartbeat check is enabled. It only works normally when" +
      s"${config.HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT.key} is larger than worker's" +
      s"heartbeat interval.")
    sc.conf.get(config.HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT)
  case false => 0
}

Why do we select these values?

These are some equations that the configurations should respect.

(1) executorTimeoutMs (30s) + expiryCandidatesTimeout (90s) == executorTimeoutMs (without this PR, 120s)

  • Try to keep the same behaviors.

(2) executorHeartbeatInterval: executor will send a heartbeat to driver every executorHeartbeatInterval (default: 10s) seconds.

  • executorHeartbeatInterval (10s) <= executorTimeoutMs (30s)
  • executorHeartbeatInterval (10s) <= expiryCandidatesTimeout (90s)

(3) initial delay (30s) >= executorTimeoutMs (30s)

  • If initial delay is smaller than executorTimeoutMs, the first function call of expireDeadHosts is impossible to expire any executor. In addition, initial delay also provides the scheduler backend an interval to initialize before the lazy variable evaluations.

(4) checkTimeoutIntervalMs <= executorTimeoutMs: this is an existing assertion in HeartbeatReceiver.scala.

require(checkTimeoutIntervalMs <= executorTimeoutMs,
  s"${Network.NETWORK_TIMEOUT_INTERVAL.key} should be less than or " +
    s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}.")

kevin85421 avatar Sep 06 '22 08:09 kevin85421

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

github-actions[bot] avatar Dec 16 '22 00:12 github-actions[bot]