spark icon indicating copy to clipboard operation
spark copied to clipboard

[wip][SPARK-40320][Core] Executor should exit when it failed to initialize for fatal error

Open yabola opened this issue 3 years ago • 6 comments

PR is for quick discussion here, Reproduce step please see jira https://issues.apache.org/jira/browse/SPARK-40320 When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung

I check the code and I find in org.apache.spark.rpc.netty.Inbox#safelyCall it will throw fatal error (UnsatisfiedLinkError is fatal error here ) in method dealWithFatalError .

Some ideas: I think it is very hard to know what happened here unless we check in the code. The Executor is active in SparkUI but it can't do anything. We will wonder if the driver is broken or the Executor problem.  I think at least the Executor can exit (kill itself) or the Executor status shouldn't be active in UI.

yabola avatar Sep 03 '22 03:09 yabola

Can one of the admins verify this patch?

AmplabJenkins avatar Sep 04 '22 01:09 AmplabJenkins

To answer why uncaughtExceptionHandler in Executor doesn't catch error https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L106

I have a simple little program to explain : the first Exception won't be caught by UncaughtExceptionHandler , the second Exception will be caught by UncaughtExceptionHandler(If we remove the first Exception code)

yabola avatar Sep 20 '22 10:09 yabola

object TestExceptionHandler {

  val threadFactory = new ThreadFactoryBuilder().setDaemon(true).build()

  def main(args: Array[String]): Unit = {
    val service = Executors.newSingleThreadExecutor(threadFactory)
    val runLoop = new Runnable() {
      override def run(): Unit = receiveLoop()
    }
    service.execute(runLoop)
    Thread.sleep(2 * 1000)
  }

  private def receiveLoop() {
    try {
      Executors.newSingleThreadExecutor(threadFactory).execute(new MessageLoop)
    } catch {
      case t: Throwable => {
        println("receiveLoop catch the Exception")
      }
    }
  }


  class MessageLoop extends Runnable {
    Thread.setDefaultUncaughtExceptionHandler(new MyExceptionHandler)
    /**
     * This Exception will not be caught by MyExceptionHandler. It will be caught by receiveLoop
     */
    if (1 == 1) {
      throw new UnsatisfiedLinkError("My Exception error, will be caught by receiveLoop")
    }
    override def run(): Unit = {
      println("MessageLoop run method.... ")
      /**
       * This Exception will be caught by MyExceptionHandler
       * (The premise is that the above Exception code needs to be commented out)
       */
      if (1 == 1) {
        throw new UnsatisfiedLinkError("My Exception error, will be caught by MyExceptionHandler !")
      }
    }
  }

  class MyExceptionHandler extends Thread.UncaughtExceptionHandler {
    override def uncaughtException(t: Thread, e: Throwable): Unit = {
      println("MyExceptionHandler catch the Exception... ")
    }
  }
}

yabola avatar Sep 20 '22 10:09 yabola

It did catch the fatal error in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L82-L89

It will resubmit receiveLoop and after that it will be blocked here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L69

This Executor did not initialize successfully in the first time , so it didn't send LaunchedExecutor to Driver (you can see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L172 )

So the Executor can't launch task and can't do anything now (even Executor is in active state in sparkUI). It is very hard to know what happened here unless we check in the code why it hung here.

Actually in previous code, Executor will exit in launch task if Executor can't initialize(you can see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L180) . However after PR : https://github.com/apache/spark/pull/25964 if Executor can't initialize then the worker won't launch any task so the Executor won't exit now.

yabola avatar Sep 20 '22 11:09 yabola

@Ngone51 Hi, I updated some reasons, I don't know if it can be notified on jira, so I updated it in PR

yabola avatar Sep 20 '22 11:09 yabola

@mridulm @tgravescs If you have time, can you also help take a look~

yabola avatar Sep 22 '22 00:09 yabola

I am not sure I follow what the code snippet is trying to do. Changing the code to :

  private def receiveLoop() {
      Executors.newSingleThreadExecutor(threadFactory).execute(new MessageLoop)
    }

does cause the uncaughtException to be invoked - as expected. If we do catch the Throwable in receiveLoop, then obviously it is no longer an uncaught exception (the exception is being thrown while creating the MessageLoop - which happens in receiveLoop) - and so MyExceptionHandler.uncaughtException wont be invoked.

mridulm avatar Sep 22 '22 02:09 mridulm

@mridulm Thanks for your reply. Yes, It is another situation, there are three cases to explain whether uncaughtException can catch exceptions.

As you said

private def receiveLoop() {
      Executors.newSingleThreadExecutor(threadFactory).execute(new MessageLoop)
    }

uncaughtException will be invoked.

If we surrounded with try catch , then uncaughtException won't be invoked.

private def receiveLoop() {
    try {
      Executors.newSingleThreadExecutor(threadFactory).execute(new MessageLoop)
    } catch {
      case t: Throwable => {
        println("receiveLoop catch the Exception")
      }
    }
  }

Even If we surrounded with try-catch , uncaughtException can be invoked if Exception happened in TestExceptionHandler.MessageLoop#run() But we need to remove the first Exception code in MessageLoop( otherwise it could not be initialized ).

So it can also explain the behavior in Spark. org.apache.spark.rpc.netty.MessageLoop#receiveLoop is surrounded with try-catch, so when error happened in Executor init , uncaughtException can't catch the fatal error. But if Executor initialize successfully, uncaughtException can catch error in org.apache.spark.executor.Executor.TaskRunner#run

yabola avatar Sep 22 '22 11:09 yabola

My example answer the question from @Ngone51 in Jira. But back to the current issue of this,

It did catch the fatal error in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L82-L89

It will resubmit receiveLoop and after that it will be blocked here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L69

This Executor did not initialize successfully in the first time , so it didn't send LaunchedExecutor to Driver (you can see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L172 )

So the Executor can't launch task and can't do anything now (even Executor is in active state in sparkUI). It is very hard to know what happened here unless we check in the code why it hung here.

Actually in previous code, Executor will exit in launch task if Executor can't initialize(you can see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L180) . However after PR : #25964 if Executor can't initialize then the worker won't launch any task so the Executor won't exit now.

I think we would better catch the fatal error when initialize Executor as my PR changed

yabola avatar Sep 22 '22 11:09 yabola

https://github.com/apache/spark/blob/39b65b414c4ba36ada478369149f54452d90dd7b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L169-L176

The issue seems to be that Executor construction failed due to the fatal error thrown during plugin initialization. And the fatal error doesn't fail the executor process, which leaves the executor in idle and becomes unavailable to the driver since LaunchedExecutor is not sent. Correct me if I'm wrong.

Ngone51 avatar Sep 22 '22 12:09 Ngone51

A few points:

  • Plugins are initialized as part of Executor construction, but after uncaught exception handler is set.
  • If anything other than NonFatal is thrown in receive (like the LinkageError from @yabola's example here) during executor creation, it will be handled in Inbox.process as part of safelyCall - which will result in re-throwing it.
  • This now bubbles up to receiveLoop - where we end up throwing the throwable again (after scheduling it for re-execution).
  • The throw should result in uncaught exception handler killing the jvm - and if it does not, then the re-enqueue in prev step will cause the message to be reprocessed.
    • In this case we are discussing, it should have resulted in killing the VM.

mridulm avatar Sep 22 '22 23:09 mridulm

@Ngone51 @mridulm Yeah~You are right. Do you have any idea to solve this problem? My idea is to just exit Executor as PR changed.

yabola avatar Sep 23 '22 03:09 yabola

I will try to reproduce this locally - is there anything else specific I need to know to do so ? Or is the instructions in SPARK-40320 sufficient ? Thx

mridulm avatar Sep 23 '22 06:09 mridulm

@mridulm I think it is sufficient. If you have any doubts please tell me~ I run my test code in spark standalone cluster (It should also have the issue in other modes ) This is my driver code , the job will remain stuck:

val spark = SparkSession
      .builder()
      .appName("plugin example")
      .config("spark.plugins", "org.apache.spark.examples.sql.ErrorSparkPlugin")
      .getOrCreate()
    val df = spark.range(10, 20)
    df.show()
    Thread.sleep(20 * 60 * 1000)
    spark.stop()

yabola avatar Sep 23 '22 08:09 yabola

The throw should result in uncaught exception handler killing the jvm - and if it does not, then the re-enqueue in prev step will cause the message to be reprocessed.

@mridulm Is the fatal error caught by the uncaught exception handler in this case? If yes, why it doesn't fail the JVM?

Ngone51 avatar Sep 26 '22 14:09 Ngone51

I agree @Ngone51, it should have :-) I am trying to reproduce this locally and see if we are missing something - or there is some nuance behind the observed behavior.

mridulm avatar Sep 26 '22 18:09 mridulm

Added a few debug statements, and it became clear what the issue is. Essentially, since we are leveraging a ThreadPoolExecutor, it does not result in killing the thread with the exception/error thrown - but rather, will call ThreadPoolExecutor.afterExecute with the cause for failure (See runWorker for more).

We should be overriding this, and invoke our uncaughtExceptionHandler when an exception is thrown. (HadoopThreadPoolExecutor is an example of this - though our code would be something like:

override def afterExecute(r: Runnable, t: Throwable): Unit = {
  if (null != t) {
    Option(Thread.getDefaultUncaughtExceptionHandler).foreach(_.uncaughtException(Thread.currentThread(), t))
  }
}

)

In order to minimize change, we should do this for a small subset of specific thread pool's though - but once we are sure, expand it to all. Thoughts ?

Details: Following "dispatcher-Executor,5,main" thread around ... In receiveLoopRunnable when a Throwable is thrown:

22/09/26 17:17:12 INFO DedicatedMessageLoop: Current exceptionHandler = org.apache.spark.util.SparkUncaughtExceptionHandler@27c71f14
22/09/26 17:17:12 INFO DedicatedMessageLoop: Thread = Thread[dispatcher-Executor,5,main]
22/09/26 17:17:12 INFO DedicatedMessageLoop: Stack ...
java.lang.Exception: For stack
        at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:56)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

In receiveLoopRunnable's run, when a Throwable is thrown:

2/09/26 17:17:12 INFO DedicatedMessageLoop: Thread = Thread[dispatcher-Executor,5,main], stackTrace =
22/09/26 17:17:12 INFO DedicatedMessageLoop:     [email protected]/java.lang.Thread.dumpThreads(Native Method)
22/09/26 17:17:12 INFO DedicatedMessageLoop:     [email protected]/java.lang.Thread.getAllStackTraces(Thread.java:1653)
22/09/26 17:17:12 INFO DedicatedMessageLoop:     app//org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$dumpAllStackTraces(MessageLoop.scala:70)
22/09/26 17:17:12 INFO DedicatedMessageLoop:     app//org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:58)
22/09/26 17:17:12 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
22/09/26 17:17:12 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264)
22/09/26 17:17:12 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
22/09/26 17:17:12 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

and finally, a few seconds after Executor Inbox failure - dumping all threads in a new thread.

22/09/26 17:17:14 INFO DedicatedMessageLoop: Thread = Thread[dispatcher-Executor,5,main], stackTrace =
22/09/26 17:17:14 INFO DedicatedMessageLoop:     [email protected]/jdk.internal.misc.Unsafe.park(Native Method)
22/09/26 17:17:14 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
22/09/26 17:17:14 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
22/09/26 17:17:14 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)
22/09/26 17:17:14 INFO DedicatedMessageLoop:     app//org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:102)
22/09/26 17:17:14 INFO DedicatedMessageLoop:     app//org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:45)
22/09/26 17:17:14 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
22/09/26 17:17:14 INFO DedicatedMessageLoop:     [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
22/09/26 17:17:14 INFO DedicatedMessageLoop:     [email protected]/java.lang.Thread.run(Thread.java:829)

mridulm avatar Sep 26 '22 22:09 mridulm

Essentially, since we are leveraging a ThreadPoolExecutor, it does not result in killing the thread with the exception/error thrown - but rather, will call ThreadPoolExecutor.afterExecute with the cause for failure (See runWorker for more).

@mridulm Thanks for your analysis. But I still don't get why SparkUncaughtExceptionHandler doesn't catch the throwable.

try {
  beforeExecute(wt, task);
  Throwable thrown = null;
  try {
      task.run();
  } catch (RuntimeException x) {
      thrown = x; throw x;
  } catch (Error x) {
      thrown = x; throw x;
  } catch (Throwable x) {
      thrown = x; throw new Error(x);
  } finally {
      afterExecute(task, thrown);
  }
} finally {
  task = null;
  w.completedTasks++;
  w.unlock();
}

afterExecute does handle the throwable but it doesn't catch it. So the throwable should be still thrown out.

Ngone51 avatar Sep 27 '22 04:09 Ngone51

Thanks for the query @Ngone51 - I missed out one aspect of my analysis, which ends up completely changing the solution - my bad :-(

The answer to your query has the reason for the lack of failure - this is due to the two types of api's we are using ...

For DedicatedMessageLoop, for the initial submission of receiveLoopRunnable - we use ExecutorService.submit api - while for all other cases in MessageLoop, we use the Executor.execute api - and this is the cause for the behavior.

The submit api returns a Future - and if we look at the implementation of FutureTask.run, we see that it catches Throwable and preserves that as the outcome (setException) - which is the reason why the thread itself does not die.

So the specific case might be mitigated by using execute instead of submit api.

We can easily validate it with the following snippet:


import java.lang.Thread._
import java.util.concurrent._

Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
  def uncaughtException(t: Thread, e: Throwable): Unit = {
    println("t = " + t + ", e = " + e)
    e.printStackTrace()
  } } )

def namedThreadFactory(): ThreadFactory = new ThreadFactory() {
  def newThread(e: Runnable): Thread = {
    val th = new Thread(e)
    th.setDaemon(true)
    th
  }
}

val threadFactory = namedThreadFactory()
val threadPool = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue[Runnable](), threadFactory)
threadPool.allowCoreThreadTimeOut(true)

// Will not call UncaughtExceptionHandler, but future will have the error
val future = threadPool.submit(new Runnable(){
  def run(): Unit = {
    println("Throwing err for submit ... ")
    throw new LinkageError("Test")
  }
})
future

// will call UncaughtExceptionHandler
threadPool.execute(new Runnable(){
  def run(): Unit = {
    println("Throwing err for execute ... ");
    throw new LinkageError("Test")
  }
})



mridulm avatar Sep 27 '22 05:09 mridulm

Oh I see. That's really a subtle diff. Thanks @mridulm

So the specific case might be mitigated by using execute instead of submit API.

+1

Ngone51 avatar Sep 27 '22 05:09 Ngone51

Can you take a look at comment above @yabola and work on the fix ? Since you already spent a lot of time on this.

mridulm avatar Sep 27 '22 06:09 mridulm

@mridulm Really thanks for your analysis! Please give me some time to understand.

yabola avatar Sep 27 '22 06:09 yabola

@mridulm Thank you very much for your detailed analysis. I had tested it in my cluster and it worked as expected. I had updated my PR description.

yabola avatar Sep 28 '22 06:09 yabola

Can we also add the example code you had to reproduce the issue as a test ?

mridulm avatar Sep 28 '22 12:09 mridulm

@mridulm The test class is a bit difficult to write. It needs to run in the local-cluster mode to simulate, please see if it is suitable, thanks.

yabola avatar Sep 30 '22 08:09 yabola

+CC @Ngone51 I will merge it later tomorrow if no further comments.

mridulm avatar Oct 19 '22 05:10 mridulm

Looks like this dropped off my radar - merging to master.

mridulm avatar Oct 26 '22 06:10 mridulm

Thanks for working on this @yabola ! Thanks for the reviews and brainstorming on this @Ngone51 :-)

mridulm avatar Oct 26 '22 06:10 mridulm

I am assuming your jira id is miracle based on previous PR conversation - do let me know if that is incorrect @yabola !

mridulm avatar Oct 26 '22 06:10 mridulm

Thank you for this fix, all!

dongjoon-hyun avatar Oct 26 '22 07:10 dongjoon-hyun

@mridulm @Ngone51 Thank you all for reviews! Yeah, I think my jira id is miracle. I checked my jira: Login name and Username is miracle, but Full name is Mars.

yabola avatar Oct 26 '22 08:10 yabola