spark
spark copied to clipboard
[SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow
What changes were proposed in this pull request?
This PR will run scheduleShuffleMergeFinalize() and send finalizeShuffleMerge RPCs in two threads, and stop all work after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT regardless of sucess or failure.
Now we will only call removeShufflePushMergerLocation when shuffle fetch fails, this PR will also prevent these merger nodes from bing selected as mergeLocations when creating connections fails. Adding those bad merge nodes to finalizeBlackNodes, so subsequent shuffle map stages will not try to connect them.
Why are the changes needed?
DAGSchuedler will finalize each shuffle map stage in one shuffle-merge-finalizer thread, and lock clientPool.locks[clientIndex] when creating connect to the ESS merger node, the other shuffle-merge-finalizer threads (one stage per thread) will wait for SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY.
Although reducing SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY helps, the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * lostMergerNodesSize * stageSize ) will still be long.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add UT
Can one of the admins verify this patch?
Hi, @otterc Could you help to review this PR ? thanks
@wankunde
Send finalize RPCs will block the main thread due to creating connection to some unreachable nodes.
Which main thread are you referring to here? Could you please explain which thread is being blocked. AFAICT this is already being done by shuffle-merge-finalizer threads.
Why can't you reduce the SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY default for the cluster? One of the reasons this configuration was introduced was because creating connection should have a lower timeout than connection idle time
Also your solution is adding shuffle service nodes to an excluded list which isn't what the description says. Could you please explain with examples/logs of what problems are you facing and the solution that is being proposed?
@wankunde
Send finalize RPCs will block the main thread due to creating connection to some unreachable nodes.
Which main thread are you referring to here? Could you please explain which thread is being blocked. AFAICT this is already being done by
shuffle-merge-finalizerthreads.Why can't you reduce the
SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEYdefault for the cluster? One of the reasons this configuration was introduced was because creating connection should have a lower timeout than connection idle time
Yes, DAGSchuedler will finalize each shuffle map stage in one shuffle-merge-finalizer thread, and lock clientPool.locks[clientIndex] when creating connect to the ESS merger node, the other shuffle-merge-finalizer threads (one stage per thread) will wait for SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY.
Although reducing SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY helps, the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * lostMergerNodesSize * stageSize ) will still be long. This PR will run scheduleShuffleMergeFinalize() and send finalizeShuffleMerge RPCs in two threads, and stop all work after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT regardless of sucess or failure.
Now we will only call removeShufflePushMergerLocation when shuffle fetch fails, this PR will also prevent these merger nodes from bing selected as mergeLocations when creating connections fails. Adding those bad merge nodes to finalizeBlackNodes, so subsequent shuffle map stages will not try to connect them.
"shuffle-merge-finalizer-4" #1842 daemon prio=5 os_prio=0 tid=0x00007f19440d8000 nid=0x2be822 in Object.wait() [0x00007f19ea7f7000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:460)
at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:679)
- locked <0x00007f3eb8244598> (a io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:298)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:283)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
- locked <0x00007f1d7b0c0ba8> (a java.lang.Object)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
at org.apache.spark.network.shuffle.ExternalBlockStoreClient.finalizeShuffleMerge(ExternalBlockStoreClient.java:229)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$finalizeShuffleMerge$5(DAGScheduler.scala:2437)
"shuffle-merge-finalizer-3" #1647 daemon prio=5 os_prio=0 tid=0x00007f19440d2800 nid=0x2be52e waiting for monitor entry [0x00007f1688ff2000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:198)
- waiting to lock <0x00007f1d7b0c0ba8> (a java.lang.Object)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
at org.apache.spark.network.shuffle.ExternalBlockStoreClient.finalizeShuffleMerge(ExternalBlockStoreClient.java:229)
at org.apache.spark.scheduler.DAGScheduler$$anon$7.$anonfun$run$2(DAGScheduler.scala:2419)
...
Hi, @otterc could you help me to review this PR? Or should I provide more information on this issue ?
So the issue is that the wait period timer doesn't take into account the time for connection creation which is a bug. However, in this PR you are adding another major change of excluding merger nodes based on this. I don't think we should combine that with the fix for this bug because if we decide to do that then we need to pay more careful consideration of how it interacts with exclusion of executor nodes, etc.
So the issue is that the wait period timer doesn't take into account the time for connection creation which is a bug. However, in this PR you are adding another major change of excluding merger nodes based on this. I don't think we should combine that with the fix for this bug because if we decide to do that then we need to pay more careful consideration of how it interacts with exclusion of executor nodes, etc.
Hi, @otterc thanks for your review. I have remove the logical for adding merger nodes into blacklist if an IOException was thrown. Now each stage needs to wait for min(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT * lostMergerNodesSize, PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT), maybe we can optimize this later.
Hi, @otterc @mridulm , I updated the code, could you help to review the new code?
Hi, @mridulm @otterc New processing flow:
- Send all the finalize RPC tasks and wait for merged status in a new thread pool.
- Wait for
shuffleMergeResultsTimeoutSecin main thread if registerMergeResults = true, and then cancel all sending rpc tasks - Wait for
shuffleMergeResultsTimeoutSecasynchronously if registerMergeResults = false, and then cancel all sending rpc tasks
Does this works?
I'm sorry for the late reply, I have updated the code.
+CC @otterc, @Ngone51 PTAL
Merged to master. Thanks for working on this @wankunde ! Thanks for the review @otterc :-)