spark
spark copied to clipboard
[wip][SPARK-40320][Core] Executor should exit when it failed to initialize for fatal error
PR is for quick discussion here, Reproduce step please see jira https://issues.apache.org/jira/browse/SPARK-40320 When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung
I check the code and I find in org.apache.spark.rpc.netty.Inbox#safelyCall it will throw fatal error (UnsatisfiedLinkError is fatal error here ) in method dealWithFatalError .
Some ideas: I think it is very hard to know what happened here unless we check in the code. The Executor is active in SparkUI but it can't do anything. We will wonder if the driver is broken or the Executor problem. I think at least the Executor can exit (kill itself) or the Executor status shouldn't be active in UI.
Can one of the admins verify this patch?
To answer why uncaughtExceptionHandler in Executor doesn't catch error
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L106
I have a simple little program to explain :
the first Exception won't be caught by UncaughtExceptionHandler , the second Exception will be caught by UncaughtExceptionHandler(If we remove the first Exception code)
object TestExceptionHandler {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).build()
def main(args: Array[String]): Unit = {
val service = Executors.newSingleThreadExecutor(threadFactory)
val runLoop = new Runnable() {
override def run(): Unit = receiveLoop()
}
service.execute(runLoop)
Thread.sleep(2 * 1000)
}
private def receiveLoop() {
try {
Executors.newSingleThreadExecutor(threadFactory).execute(new MessageLoop)
} catch {
case t: Throwable => {
println("receiveLoop catch the Exception")
}
}
}
class MessageLoop extends Runnable {
Thread.setDefaultUncaughtExceptionHandler(new MyExceptionHandler)
/**
* This Exception will not be caught by MyExceptionHandler. It will be caught by receiveLoop
*/
if (1 == 1) {
throw new UnsatisfiedLinkError("My Exception error, will be caught by receiveLoop")
}
override def run(): Unit = {
println("MessageLoop run method.... ")
/**
* This Exception will be caught by MyExceptionHandler
* (The premise is that the above Exception code needs to be commented out)
*/
if (1 == 1) {
throw new UnsatisfiedLinkError("My Exception error, will be caught by MyExceptionHandler !")
}
}
}
class MyExceptionHandler extends Thread.UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
println("MyExceptionHandler catch the Exception... ")
}
}
}
It did catch the fatal error in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L82-L89
It will resubmit receiveLoop and after that it will be blocked here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L69
This Executor did not initialize successfully in the first time , so it didn't send LaunchedExecutor to Driver (you can see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L172 )
So the Executor can't launch task and can't do anything now (even Executor is in active state in sparkUI). It is very hard to know what happened here unless we check in the code why it hung here.
Actually in previous code, Executor will exit in launch task if Executor can't initialize(you can see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L180) . However after PR : https://github.com/apache/spark/pull/25964 if Executor can't initialize then the worker won't launch any task so the Executor won't exit now.
@Ngone51 Hi, I updated some reasons, I don't know if it can be notified on jira, so I updated it in PR
@mridulm @tgravescs If you have time, can you also help take a look~
I am not sure I follow what the code snippet is trying to do. Changing the code to :
private def receiveLoop() {
Executors.newSingleThreadExecutor(threadFactory).execute(new MessageLoop)
}
does cause the uncaughtException to be invoked - as expected.
If we do catch the Throwable in receiveLoop, then obviously it is no longer an uncaught exception (the exception is being thrown while creating the MessageLoop - which happens in receiveLoop) - and so MyExceptionHandler.uncaughtException wont be invoked.
@mridulm Thanks for your reply. Yes, It is another situation, there are three cases to explain whether uncaughtException can catch exceptions.
As you said
private def receiveLoop() {
Executors.newSingleThreadExecutor(threadFactory).execute(new MessageLoop)
}
uncaughtException will be invoked.
If we surrounded with try catch , then uncaughtException won't be invoked.
private def receiveLoop() {
try {
Executors.newSingleThreadExecutor(threadFactory).execute(new MessageLoop)
} catch {
case t: Throwable => {
println("receiveLoop catch the Exception")
}
}
}
Even If we surrounded with try-catch , uncaughtException can be invoked if Exception happened in TestExceptionHandler.MessageLoop#run()
But we need to remove the first Exception code in MessageLoop( otherwise it could not be initialized ).
So it can also explain the behavior in Spark.
org.apache.spark.rpc.netty.MessageLoop#receiveLoop is surrounded with try-catch, so when error happened in Executor init , uncaughtException can't catch the fatal error.
But if Executor initialize successfully, uncaughtException can catch error in org.apache.spark.executor.Executor.TaskRunner#run
My example answer the question from @Ngone51 in Jira. But back to the current issue of this,
It did catch the fatal error in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L82-L89
It will resubmit receiveLoop and after that it will be blocked here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L69
This Executor did not initialize successfully in the first time , so it didn't send LaunchedExecutor to Driver (you can see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L172 )
So the Executor can't launch task and can't do anything now (even Executor is in active state in sparkUI). It is very hard to know what happened here unless we check in the code why it hung here.
Actually in previous code, Executor will exit in launch task if Executor can't initialize(you can see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L180) . However after PR : #25964 if Executor can't initialize then the worker won't launch any task so the Executor won't exit now.
I think we would better catch the fatal error when initialize Executor as my PR changed
https://github.com/apache/spark/blob/39b65b414c4ba36ada478369149f54452d90dd7b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L169-L176
The issue seems to be that Executor construction failed due to the fatal error thrown during plugin initialization. And the fatal error doesn't fail the executor process, which leaves the executor in idle and becomes unavailable to the driver since LaunchedExecutor is not sent. Correct me if I'm wrong.
A few points:
- Plugins are initialized as part of
Executorconstruction, but after uncaught exception handler is set. - If anything other than
NonFatalis thrown inreceive(like theLinkageErrorfrom @yabola's example here) during executor creation, it will be handled inInbox.processas part ofsafelyCall- which will result in re-throwing it. - This now bubbles up to
receiveLoop- where we end up throwing the throwable again (after scheduling it for re-execution). - The throw should result in uncaught exception handler killing the jvm - and if it does not, then the re-enqueue in prev step will cause the message to be reprocessed.
- In this case we are discussing, it should have resulted in killing the VM.
@Ngone51 @mridulm Yeah~You are right. Do you have any idea to solve this problem? My idea is to just exit Executor as PR changed.
I will try to reproduce this locally - is there anything else specific I need to know to do so ? Or is the instructions in SPARK-40320 sufficient ? Thx
@mridulm I think it is sufficient. If you have any doubts please tell me~ I run my test code in spark standalone cluster (It should also have the issue in other modes ) This is my driver code , the job will remain stuck:
val spark = SparkSession
.builder()
.appName("plugin example")
.config("spark.plugins", "org.apache.spark.examples.sql.ErrorSparkPlugin")
.getOrCreate()
val df = spark.range(10, 20)
df.show()
Thread.sleep(20 * 60 * 1000)
spark.stop()
The throw should result in uncaught exception handler killing the jvm - and if it does not, then the re-enqueue in prev step will cause the message to be reprocessed.
@mridulm Is the fatal error caught by the uncaught exception handler in this case? If yes, why it doesn't fail the JVM?
I agree @Ngone51, it should have :-) I am trying to reproduce this locally and see if we are missing something - or there is some nuance behind the observed behavior.
Added a few debug statements, and it became clear what the issue is.
Essentially, since we are leveraging a ThreadPoolExecutor, it does not result in killing the thread with the exception/error thrown - but rather, will call ThreadPoolExecutor.afterExecute with the cause for failure (See runWorker for more).
We should be overriding this, and invoke our uncaughtExceptionHandler when an exception is thrown.
(HadoopThreadPoolExecutor is an example of this - though our code would be something like:
override def afterExecute(r: Runnable, t: Throwable): Unit = {
if (null != t) {
Option(Thread.getDefaultUncaughtExceptionHandler).foreach(_.uncaughtException(Thread.currentThread(), t))
}
}
)
In order to minimize change, we should do this for a small subset of specific thread pool's though - but once we are sure, expand it to all. Thoughts ?
Details:
Following "dispatcher-Executor,5,main" thread around ...
In receiveLoopRunnable when a Throwable is thrown:
22/09/26 17:17:12 INFO DedicatedMessageLoop: Current exceptionHandler = org.apache.spark.util.SparkUncaughtExceptionHandler@27c71f14
22/09/26 17:17:12 INFO DedicatedMessageLoop: Thread = Thread[dispatcher-Executor,5,main]
22/09/26 17:17:12 INFO DedicatedMessageLoop: Stack ...
java.lang.Exception: For stack
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:56)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
In receiveLoopRunnable's run, when a Throwable is thrown:
2/09/26 17:17:12 INFO DedicatedMessageLoop: Thread = Thread[dispatcher-Executor,5,main], stackTrace =
22/09/26 17:17:12 INFO DedicatedMessageLoop: [email protected]/java.lang.Thread.dumpThreads(Native Method)
22/09/26 17:17:12 INFO DedicatedMessageLoop: [email protected]/java.lang.Thread.getAllStackTraces(Thread.java:1653)
22/09/26 17:17:12 INFO DedicatedMessageLoop: app//org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$dumpAllStackTraces(MessageLoop.scala:70)
22/09/26 17:17:12 INFO DedicatedMessageLoop: app//org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:58)
22/09/26 17:17:12 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
22/09/26 17:17:12 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264)
22/09/26 17:17:12 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
22/09/26 17:17:12 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
and finally, a few seconds after Executor Inbox failure - dumping all threads in a new thread.
22/09/26 17:17:14 INFO DedicatedMessageLoop: Thread = Thread[dispatcher-Executor,5,main], stackTrace =
22/09/26 17:17:14 INFO DedicatedMessageLoop: [email protected]/jdk.internal.misc.Unsafe.park(Native Method)
22/09/26 17:17:14 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
22/09/26 17:17:14 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2081)
22/09/26 17:17:14 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)
22/09/26 17:17:14 INFO DedicatedMessageLoop: app//org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:102)
22/09/26 17:17:14 INFO DedicatedMessageLoop: app//org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:45)
22/09/26 17:17:14 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
22/09/26 17:17:14 INFO DedicatedMessageLoop: [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
22/09/26 17:17:14 INFO DedicatedMessageLoop: [email protected]/java.lang.Thread.run(Thread.java:829)
Essentially, since we are leveraging a ThreadPoolExecutor, it does not result in killing the thread with the exception/error thrown - but rather, will call ThreadPoolExecutor.afterExecute with the cause for failure (See runWorker for more).
@mridulm Thanks for your analysis. But I still don't get why SparkUncaughtExceptionHandler doesn't catch the throwable.
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
afterExecute does handle the throwable but it doesn't catch it. So the throwable should be still thrown out.
Thanks for the query @Ngone51 - I missed out one aspect of my analysis, which ends up completely changing the solution - my bad :-(
The answer to your query has the reason for the lack of failure - this is due to the two types of api's we are using ...
For DedicatedMessageLoop, for the initial submission of receiveLoopRunnable - we use ExecutorService.submit api - while for all other cases in MessageLoop, we use the Executor.execute api - and this is the cause for the behavior.
The submit api returns a Future - and if we look at the implementation of FutureTask.run, we see that it catches Throwable and preserves that as the outcome (setException) - which is the reason why the thread itself does not die.
So the specific case might be mitigated by using execute instead of submit api.
We can easily validate it with the following snippet:
import java.lang.Thread._
import java.util.concurrent._
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
def uncaughtException(t: Thread, e: Throwable): Unit = {
println("t = " + t + ", e = " + e)
e.printStackTrace()
} } )
def namedThreadFactory(): ThreadFactory = new ThreadFactory() {
def newThread(e: Runnable): Thread = {
val th = new Thread(e)
th.setDaemon(true)
th
}
}
val threadFactory = namedThreadFactory()
val threadPool = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue[Runnable](), threadFactory)
threadPool.allowCoreThreadTimeOut(true)
// Will not call UncaughtExceptionHandler, but future will have the error
val future = threadPool.submit(new Runnable(){
def run(): Unit = {
println("Throwing err for submit ... ")
throw new LinkageError("Test")
}
})
future
// will call UncaughtExceptionHandler
threadPool.execute(new Runnable(){
def run(): Unit = {
println("Throwing err for execute ... ");
throw new LinkageError("Test")
}
})
Oh I see. That's really a subtle diff. Thanks @mridulm
So the specific case might be mitigated by using execute instead of submit API.
+1
Can you take a look at comment above @yabola and work on the fix ? Since you already spent a lot of time on this.
@mridulm Really thanks for your analysis! Please give me some time to understand.
@mridulm Thank you very much for your detailed analysis. I had tested it in my cluster and it worked as expected. I had updated my PR description.
Can we also add the example code you had to reproduce the issue as a test ?
@mridulm The test class is a bit difficult to write. It needs to run in the local-cluster mode to simulate, please see if it is suitable, thanks.
+CC @Ngone51 I will merge it later tomorrow if no further comments.
Looks like this dropped off my radar - merging to master.
Thanks for working on this @yabola ! Thanks for the reviews and brainstorming on this @Ngone51 :-)
I am assuming your jira id is miracle based on previous PR conversation - do let me know if that is incorrect @yabola !
Thank you for this fix, all!
@mridulm @Ngone51 Thank you all for reviews!
Yeah, I think my jira id is miracle. I checked my jira: Login name and Username is miracle, but Full name is Mars.