spark
spark copied to clipboard
[SPARK-45873][CORE][YARN][K8S] Make ExecutorFailureTracker more tolerant when app remains sufficient resources
What changes were proposed in this pull request?
This PR introduces a new configuration, spark.executor.failureTracker.keepaliveOnMinLiveExecutors.enabled, which defaults to false.
When false, it fully respects the executor failures counted by ExecutorFailureTracker, and if the count exceeds the maximum number of executor failures, the app will exit forcefully and immediately. Note that this is the behavior we currently have.
When true, if the count exceeds the maximum, we do an extra check to decide whether we shall exit the app directly or not. It checks whether it still has sufficient resources or not based on the initial executors * spark.scheduler.minRegisteredResourcesRatio. The app continues with sufficient resources and fails if the live executors keep dying, resulting in insufficient resources. If there is no more dying, the app gets the job finished.
Why are the changes needed?
The ExecutorFailureTracker is overly sensitive to executor failures and tends to overreact by terminating the application immediately upon reaching the threshold, regardless of whether the current resources obtained are sufficient or not. Since executor allocation depends on various factors such as resource managers, host environments, and external dependencies, it may become unavailable for some time. During this period, ExecutorFailureTracker may accumulate enough failures to mistakenly kill itself.
Here is also an example from our prod,
The application had been running for hours before suddenly crashed with Stage cancelled because SparkContext was shut down and Max number of executor failures (20) reached. Meanwhile, it still had 90% of maxNumExecutors and was about to finish. In its final moments (less than 10 seconds), it requested only one additional executor. Well, speaking of the outcomes, it was definitely an incorrect step.
The threshold of ExecutorFailureTracker is inflexible to use. It's pre-configured by spark.executor.maxNumFailures or calculated by 2 * max number of executor. It does not consider the actual numbers of live executors.
Does this PR introduce any user-facing change?
yes, new configuation
How was this patch tested?
new unit tests
Was this patch authored or co-authored using generative AI tooling?
cc @dongjoon-hyun @cloud-fan @HyukjinKwon @LuciferYang @tgravescs, thank you
cc @mridulm for YARN part because this might introduce a behavior change which the job never fail again with the executor failures. BTW, for the record, I believe this is mainly useful for K8s environment with Horizontal Pod Scheduler case, and I guess the failure situation is rare in YARN environment. That's the reason why I didn't ask a migration guide for this configuration and the behavior change.
Thank you for the check @dongjoon-hyun
Assuming that the Spark App runs in a resource prioritization environment, due to its low priority on a single machine, it may be preempted at any time (killed by the resource monitor on the single machine), but the queue resources are sufficient to quickly apply for new resources. In this scenario, will the Spark App never fail as before?
In this scenario, will the Spark App never fail as before?
@LuciferYang. It depends, but I would never use the never.
- Ratio>=1, the behavior is AS-IS
- Ratio<1, for this preempt scenario, it allows the app to survive if it has a sufficient number of resources, which changes depending on proactive DRA, passive eviction of resource managers, or runtime failures.
Ratio>=1 ? Ratio>=0.5?
Thank you @LuciferYang for the suggestions. BTW,
Ratio>=1?Ratio>=0.5?
It is ratio>=1, which makes runnings < max * raition always true and only checkes the maxNumExecutorFailures just like what we do before
Thank you @LuciferYang for the suggestions. BTW,
Ratio>=1?Ratio>=0.5?It is ratio>=1, which makes
runnings < max * raitionalways true and only checkes the maxNumExecutorFailures just like what we do before
Got
It is not clear to me what the issue is, why we are taking this proposed approach, and what the underlying cause is. Failing an application due to repeated failures is typically independent of how much resources it is currently holding.
I can speculate as to why this is happening, but can you update the jira (or pr description) with more details on what the issue is ? And why this proposal should work ?
Hi @tgravescs, thank you for the detailed review.
so I'm a bit concerned about this being confusing to users and not really be configurable, except special cases.
I do not fully agree with that. It shall be more handy to set compared with spark.executor.maxFailures.
In most cases, users may not be familiar with the "spark.executor.maxFailures" setting and may simply use the default values. If an application has a low executor limit, the default value of 3 for maxFailures can be very unreliable.
20 is a lot of failures. What is the real issue causing this? ie why are these executors failing?
The failures can be divided into two kinds. The first one is for both existing and new executors, i.e. exit on 143(killed by resource managers), oom, etc., which is OK to fail the app w/ or w/o this PR. The second one is only for new executors, i.e., some of the external dependencies file changes by expected or unexpected maintenance behaviors or rejections from resource managers, which this PR mainly focuses on to reduce the risk of an app being killed all of a sudden. In the second case, 20 is a relatively small number, as the allocating requests and responses go very quickly.
How long was the app running? Is it some cloud environment they are going away, is it really an issue with the application or its configuration?
The app I described above in the PR description ran for 1.5 hours. It failed because it hit the max executor failures while the root cause was one of the shared UDF jar changed by a developer, who turned out not to be the app owner. Yarn failed to bring up new executors, so the 20 failures were collected within 10 secs.
2023-11-06 23:39:43 CST YarnAllocator INFO - Completed container container_e106_1694175944291_7158886_01_000027 on host: x.163.org (state: COMPLETE, exit status: -1000)
2023-11-06 23:39:43 CST YarnAllocator WARN - Container from a bad node: container_e106_1694175944291_7158886_01_000027 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.308]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnSchedulerEndpoint WARN - Requesting driver to remove executor 26 for reason Container from a bad node: container_e106_1694175944291_7158886_01_000027 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.308]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnAllocator INFO - Completed container container_e106_1694175944291_7158886_01_000029 on host: x.163.org (state: COMPLETE, exit status: -1000)
2023-11-06 23:39:43 CST YarnAllocator WARN - Container from a bad node: container_e106_1694175944291_7158886_01_000029 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.308]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST BlockManagerMaster INFO - Removal of executor 26 requested
2023-11-06 23:39:43 CST BlockManagerMasterEndpoint INFO - Trying to remove executor 26 from BlockManagerMaster.
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnDriverEndpoint INFO - Asked to remove non-existent executor 26
2023-11-06 23:39:43 CST YarnAllocator INFO - Completed container container_e106_1694175944291_7158886_01_000028 on host: x.163.org (state: COMPLETE, exit status: -1000)
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnSchedulerEndpoint WARN - Requesting driver to remove executor 28 for reason Container from a bad node: container_e106_1694175944291_7158886_01_000029 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.308]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnAllocator WARN - Container from a bad node: container_e106_1694175944291_7158886_01_000028 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.308]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST BlockManagerMaster INFO - Removal of executor 28 requested
2023-11-06 23:39:43 CST BlockManagerMasterEndpoint INFO - Trying to remove executor 28 from BlockManagerMaster.
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnDriverEndpoint INFO - Asked to remove non-existent executor 28
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnSchedulerEndpoint WARN - Requesting driver to remove executor 27 for reason Container from a bad node: container_e106_1694175944291_7158886_01_000028 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.308]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnAllocator INFO - Completed container container_e106_1694175944291_7158886_01_000026 on host: x.163.org (state: COMPLETE, exit status: -1000)
2023-11-06 23:39:43 CST YarnAllocator WARN - Container from a bad node: container_e106_1694175944291_7158886_01_000026 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.308]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST BlockManagerMaster INFO - Removal of executor 27 requested
2023-11-06 23:39:43 CST BlockManagerMasterEndpoint INFO - Trying to remove executor 27 from BlockManagerMaster.
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnDriverEndpoint INFO - Asked to remove non-existent executor 27
2023-11-06 23:39:43 CST YarnAllocator INFO - Completed container container_e106_1694175944291_7158886_01_000031 on host: x.163.org (state: COMPLETE, exit status: -1000)
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnSchedulerEndpoint WARN - Requesting driver to remove executor 25 for reason Container from a bad node: container_e106_1694175944291_7158886_01_000026 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.308]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnAllocator WARN - Container from a bad node: container_e106_1694175944291_7158886_01_000031 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.316]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST BlockManagerMaster INFO - Removal of executor 25 requested
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnDriverEndpoint INFO - Asked to remove non-existent executor 25
2023-11-06 23:39:43 CST BlockManagerMasterEndpoint INFO - Trying to remove executor 25 from BlockManagerMaster.
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnSchedulerEndpoint WARN - Requesting driver to remove executor 30 for reason Container from a bad node: container_e106_1694175944291_7158886_01_000031 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.316]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnAllocator INFO - Completed container container_e106_1694175944291_7158886_01_000033 on host: x.jd.163.org (state: COMPLETE, exit status: -1000)
2023-11-06 23:39:43 CST BlockManagerMaster INFO - Removal of executor 30 requested
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnDriverEndpoint INFO - Asked to remove non-existent executor 30
2023-11-06 23:39:43 CST BlockManagerMasterEndpoint INFO - Trying to remove executor 30 from BlockManagerMaster.
2023-11-06 23:39:43 CST YarnAllocator WARN - Container from a bad node: container_e106_1694175944291_7158886_01_000033 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.316]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnAllocator INFO - Completed container container_e106_1694175944291_7158886_01_000030 on host: x.163.org (state: COMPLETE, exit status: -1000)
2023-11-06 23:39:43 CST YarnAllocator WARN - Container from a bad node: container_e106_1694175944291_7158886_01_000030 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.316]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnSchedulerEndpoint WARN - Requesting driver to remove executor 32 for reason Container from a bad node: container_e106_1694175944291_7158886_01_000033 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.316]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnAllocator INFO - Completed container container_e106_1694175944291_7158886_01_000032 on host: x.163.org (state: COMPLETE, exit status: -1000)
2023-11-06 23:39:43 CST YarnSchedulerBackend$YarnSchedulerEndpoint WARN - Requesting driver to remove executor 29 for reason Container from a bad node: container_e106_1694175944291_7158886_01_000030 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.316]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
2023-11-06 23:39:43 CST YarnAllocator WARN - Container from a bad node: container_e106_1694175944291_7158886_01_000032 on host: x.163.org. Exit status: -1000. Diagnostics: [2023-11-06 23:39:40.316]java.io.IOException: Resource x.jar changed on src filesystem (expected 1698924864275, was 1699273405453
.
We have a monitor for all our spark apps on Kubernetes and Yarn. The probability of apps failing with executor max failures is low for the total amount apps. But it turns out to be a daily issue. See
how does Spark know it would have finished and those wouldn't have also failed? The point of the feature and the existing settings are that if you have had that many failures something is likely wrong and you need to fix it. it may have been that by letting this go longer it would have just wasted more time and resources if those other ones were also going to fail.
As I have answered the first question, spark knows(might be delayed) to finish or fail. Both the failed executors and live ones are still being counted. Considering the delay and reliability, TBH, I haven't got a silver bullet for both of them. So, ratio >= 1 is provided to eliminate the delay and fail the app directly.
It is not clear to me what the issue is, why we are taking this proposed approach, and what the underlying cause is. Failing an application due to repeated failures is typically independent of how much resources it is currently holding.
I can speculate as to why this is happening, but can you update the jira (or pr description) with more details on what the issue is ? And why this proposal should work ?
Hi @mridulm. Thanks for the questions, but can you make your question much more specific? As for me, your questions are already described in the PR description based on what I understand and my findings. It would be great if I could get your point more precisely.
spark.executor.maxFailures
Oh I just realized you added this config - ie ported the yarn feature to k8s and I think you mean spark.executor.maxNumFailures. I had missed this go by.
It failed because it hit the max executor failures while the root cause was one of the shared UDF jar changed by a developer, who turned out not to be the app owner. Yarn failed to bring up new executors, so the 20 failures were collected within 10 secs.
If users changes a jar mid application, this is really bad IMHO. How do you know your application doesn't get different results on different executors. Say that had actually worked but the logic changed in the udf. This to me is a process side of things and Spark did the right thing in failing and it should have failed. Would you have known as quickly if it hadn't failed that someone pushed a bad jar? I assume maybe next application run sometime later but it still would have caused some app to fail.
The probability of apps failing with executor max failures is low for the total amount apps. But it turns out to be a daily issue
I'm not sure I follow this statement, you see this kind of issue daily and its because users push bad jars that much or why do you see it daily? I'm trying to understand how much this is really a problem that Spark should be solving. Do you see failures where having the feature on actually helps you? I kind of assume so since you ported it to k8s but if not just turn it off.
I can see a reliability aspect here that if you have a sufficient number of executors already allocated and running, then just keep running instead of killing the entire application. How you achieve that though vs this proposal I'm not sure I agree with. If user set a minimum number of executors, why isn't this just that number? As one of the other comments stated this approach is useless for normal users with dynamic allocation so why doesn't it apply to that case.
Hi @mridulm. Thanks for the questions, but can you make your question much more specific? As for me, your questions are already described in the PR description based on what I understand and my findings. It would be great if I could get your point more precisely.
The PR description is describing what the change is trying to do, along with details I am not sure are related to why executor failures were observed/why application eventually failed - would be great to understand what is going on which we are trying to mitigate with this.
Having said that, looking at your response to @tgravescs query here gives a lot more context (thanks !)
It points to an issue with how users are running the spark application - spark is not tolerant to out of band changes to the artifacts (jars, etc) while it is running - this needs to be fixed at the application end, not in spark, and we should not try to work around this issue - agree with @tgravescs on this.
Hi @tgravescs
Oh I just realized you added this config
I(Or our customer you mean) didn't add it, 20 failure is calculated by 10 max executors * 2
Hi @mridulm @tgravescs
If users changes a jar mid application, this is really bad IMHO.
spark is not tolerant to out of band changes to the artifacts (jars, etc) while it is running - this needs to be fixed at the application end,
Let's take Spark Thrift Server and Spark Connect as examples, ADD JAR is an end-user user operation and the artifacts can be changed by themselves. Starting and maintaining the Server is for system admins. If the jar issue occurs here, shall we give enough time for admins to detect the issue and then gracefully reboot it to reduce the impact on other concurrent users?
Other cases excluding the jar issue above:
- Hadoop Yarn - Capacity Scheduler container preemption
- ERROR cluster.YarnScheduler: Lost executor x on x.163.org: Container container_x on host:x.163.org was preempted.
- K8s environment with Horizontal Pod Scheduler provided by @dongjoon-hyun
These are not errors that belong to the app or resource manager, but rather ways to optimize resource usage. They are not corner cases.
ERROR cluster.YarnScheduler: Lost executor x on x.163.org: Container container_x on host:x.163.org was preempted.
Preemption on yarn shouldn't be going against the number of failed executors. If it is then something has changed and we should fix that.
case ContainerExitStatus.PREEMPTED => |
// Preemption is not the fault of the running tasks, since YARN preempts containers
// merely to do resource sharing, and tasks that fail due to preempted executors could
// just as easily finish on any other executor.
K8s environment with Horizontal Pod Scheduler case
Can you be more specific here, why is this going to cause failures that aren't similar to YARN dynamic allocation getting more executors? Is it scaling down and the containers are marked as failed vs yarn marking them as preempted and not counting against failures? Is there anyway to know on k8s this happened so we could not count them? it seems like if this is really an issue the feature should be off by default on k8s
Let's take Spark Thrift Server and Spark Connect as examples, ADD JAR is an end-user user operation and the artifacts can be changed by themselves. Starting and maintaining the Server is for system admins. If the jar issue occurs here, shall we give enough time for admins to detect the issue and then gracefully reboot it to reduce the impact on other concurrent users?
This is a consequence of using a shared environment. Ideally Spark would isolate it from other and other users wouldn't be affected but that unfortunately isn't the case. I'm not sure your environment but ideally users test things before running in some production environment and breaking things.
If this feature doesn't really work or has issues on k8s then there should be a way to disable it, which seems like more what you want here right? You are essentially saying you don't want it to fail the application, thus turn the feature off and you should just do monitoring on your own to catch issues.
Note, the documentation on this feature are missing, I made some comments: https://github.com/apache/spark/commit/40872e9a094f8459b0b6f626937ced48a8d98efb can you please fix those?
Preemption on yarn shouldn't be going against the number of failed executors. If it is then something has changed and we should fix that.
Yes, you are right
This is a consequence of using a shared environment. Ideally Spark would isolate it from other and other users wouldn't be affected but that unfortunately isn't the case. I'm not sure your environment but ideally users test things before running in some production environment and breaking things.
Yeah, the test step is necessary before the prod. But as you said 'ideally'. System robust takes precedence over that.
If this feature doesn't really work or has issues on k8s then there should be a way to disable it, which seems like more what you want here right? You are essentially saying you don't want it to fail the application, thus turn the feature off and you should just do monitoring on your own to catch issues.
What does 'this feature' point to?
Why do you always mention k8s when I give evidence on yarn? At least, the detailed examples are about yarn, the 39/40 item shown in the above snapshots are also yarn.
Well, for k8s, ExecutorFailureTracker works well for app initialization to fail fast for continuous pod failures. ExecutorFailureTracker does not work well on apps with sufficient pods, and then som failures occur on new pod allocation
Preemption on yarn shouldn't be going against the number of failed executors. If it is then something has changed and we should fix that.
Yes, you are right
What do you mean by this, are you saying the Spark on YARN handling of preempted containers is not working properly? Meaning if the container is preempted it should not show up as an executor failure. Are you seeing those preempted containers show up as failed? Or are you saying that yes Spark on YARN doesn't mark preempted as failed?
What does 'this feature' point to?
Sorry I misunderstood your environment here, I thought you were running on k8s but it looks like you running on YARN. by feature I mean the spark.yarn.max.executor.failures/spark.executor.maxNumFailures config and its functionality.
So unless yarn preemption handling is broken (please answer question above), you gave one very specific use case where user added a bad JAR, in that use case it seems like you just don't want spark.executor.maxNumFailures enabled at all. You said you don't want the app to fail so admins can come fix things up and not have it affect other users. If that is the case then Spark should allow users to turn spark.executor.maxNumFailures off or I assume you could do the same thing by setting it to int.maxvalue.
As implemented this seems very arbitrary and I would think hard for a normal user to set and use this feature. You have it as a ratio, which normally I'm in favor of but really only works if you have max executors set so it is really just a hardcoded number. That number seems arbitrary as its just depends on if you get lucky and happen to have that before some users pushes a bad jar. I don't understand why this isn't the same as minimum number of executors as that seems more in line - saying you need some minimum number for this application to run and by the way its ok to keep running with this is launching new executors is failing.
If there is some other issues with Spark Connect and add jars maybe that is a different conversation about isolation (https://issues.apache.org/jira/browse/SPARK-44146). Or maybe it needs to better prevent users from adding jars with the same name.
What do you mean by this, are you saying the Spark on YARN handling of preempted containers is not working properly? Meaning if the container is preempted it should not show up as an executor failure. Are you seeing those preempted containers show up as failed? Or are you saying that yes Spark on YARN doesn't mark preempted as failed?
PREEMPTED is ok, and its cases are not counted by executor failure tracker. I was wrong about this before, sorry to bother.
If that is the case then Spark should allow users to turn spark.executor.maxNumFailures off or I assume you could do the same thing by setting it to int.maxvalue.
There are pros and cons to this suggestion, I guess. Disabling the executor failure tracker certainly keeps the app alive, but at the same time invalidates fast fail.
As implemented this seems very arbitrary and I would think hard for a normal user to set and use this feature.
Most of configurations with numeric value and the defaults in spark are arbitrary?
I don't understand why this isn't the same as minimum number of executors as that seems more in line - saying you need some minimum number for this application to run and by the way its ok to keep running with this is launching new executors is failing.
I can try to use the minimum to removing the uncertainty of ratio
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!