spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-40096]Fix finalize shuffle stage slow due to connection creation slow

Open wankunde opened this issue 3 years ago • 12 comments

What changes were proposed in this pull request?

This PR will run scheduleShuffleMergeFinalize() and send finalizeShuffleMerge RPCs in two threads, and stop all work after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT regardless of sucess or failure.

Now we will only call removeShufflePushMergerLocation when shuffle fetch fails, this PR will also prevent these merger nodes from bing selected as mergeLocations when creating connections fails. Adding those bad merge nodes to finalizeBlackNodes, so subsequent shuffle map stages will not try to connect them.

Why are the changes needed?

DAGSchuedler will finalize each shuffle map stage in one shuffle-merge-finalizer thread, and lock clientPool.locks[clientIndex] when creating connect to the ESS merger node, the other shuffle-merge-finalizer threads (one stage per thread) will wait for SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY. Although reducing SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY helps, the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * lostMergerNodesSize * stageSize ) will still be long.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add UT

wankunde avatar Aug 16 '22 05:08 wankunde

Can one of the admins verify this patch?

AmplabJenkins avatar Aug 16 '22 06:08 AmplabJenkins

Hi, @otterc Could you help to review this PR ? thanks

wankunde avatar Aug 17 '22 15:08 wankunde

@wankunde

Send finalize RPCs will block the main thread due to creating connection to some unreachable nodes.

Which main thread are you referring to here? Could you please explain which thread is being blocked. AFAICT this is already being done by shuffle-merge-finalizer threads.

Why can't you reduce the SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY default for the cluster? One of the reasons this configuration was introduced was because creating connection should have a lower timeout than connection idle time

otterc avatar Aug 18 '22 18:08 otterc

Also your solution is adding shuffle service nodes to an excluded list which isn't what the description says. Could you please explain with examples/logs of what problems are you facing and the solution that is being proposed?

otterc avatar Aug 18 '22 18:08 otterc

@wankunde

Send finalize RPCs will block the main thread due to creating connection to some unreachable nodes.

Which main thread are you referring to here? Could you please explain which thread is being blocked. AFAICT this is already being done by shuffle-merge-finalizer threads.

Why can't you reduce the SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY default for the cluster? One of the reasons this configuration was introduced was because creating connection should have a lower timeout than connection idle time

Yes, DAGSchuedler will finalize each shuffle map stage in one shuffle-merge-finalizer thread, and lock clientPool.locks[clientIndex] when creating connect to the ESS merger node, the other shuffle-merge-finalizer threads (one stage per thread) will wait for SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY. Although reducing SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY helps, the total wait time( SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY * lostMergerNodesSize * stageSize ) will still be long. This PR will run scheduleShuffleMergeFinalize() and send finalizeShuffleMerge RPCs in two threads, and stop all work after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT regardless of sucess or failure.

Now we will only call removeShufflePushMergerLocation when shuffle fetch fails, this PR will also prevent these merger nodes from bing selected as mergeLocations when creating connections fails. Adding those bad merge nodes to finalizeBlackNodes, so subsequent shuffle map stages will not try to connect them.

"shuffle-merge-finalizer-4" #1842 daemon prio=5 os_prio=0 tid=0x00007f19440d8000 nid=0x2be822 in Object.wait() [0x00007f19ea7f7000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:460)
	at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:679)
	- locked <0x00007f3eb8244598> (a io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
	at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:298)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:283)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
	- locked <0x00007f1d7b0c0ba8> (a java.lang.Object)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
	at org.apache.spark.network.shuffle.ExternalBlockStoreClient.finalizeShuffleMerge(ExternalBlockStoreClient.java:229)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$finalizeShuffleMerge$5(DAGScheduler.scala:2437)

"shuffle-merge-finalizer-3" #1647 daemon prio=5 os_prio=0 tid=0x00007f19440d2800 nid=0x2be52e waiting for monitor entry [0x00007f1688ff2000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:198)
	- waiting to lock <0x00007f1d7b0c0ba8> (a java.lang.Object)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
	at org.apache.spark.network.shuffle.ExternalBlockStoreClient.finalizeShuffleMerge(ExternalBlockStoreClient.java:229)
	at org.apache.spark.scheduler.DAGScheduler$$anon$7.$anonfun$run$2(DAGScheduler.scala:2419)
...

wankunde avatar Aug 19 '22 03:08 wankunde

Hi, @otterc could you help me to review this PR? Or should I provide more information on this issue ?

wankunde avatar Aug 23 '22 15:08 wankunde

So the issue is that the wait period timer doesn't take into account the time for connection creation which is a bug. However, in this PR you are adding another major change of excluding merger nodes based on this. I don't think we should combine that with the fix for this bug because if we decide to do that then we need to pay more careful consideration of how it interacts with exclusion of executor nodes, etc.

otterc avatar Aug 23 '22 15:08 otterc

So the issue is that the wait period timer doesn't take into account the time for connection creation which is a bug. However, in this PR you are adding another major change of excluding merger nodes based on this. I don't think we should combine that with the fix for this bug because if we decide to do that then we need to pay more careful consideration of how it interacts with exclusion of executor nodes, etc.

Hi, @otterc thanks for your review. I have remove the logical for adding merger nodes into blacklist if an IOException was thrown. Now each stage needs to wait for min(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT * lostMergerNodesSize, PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT), maybe we can optimize this later.

wankunde avatar Aug 24 '22 08:08 wankunde

Hi, @otterc @mridulm , I updated the code, could you help to review the new code?

wankunde avatar Aug 25 '22 03:08 wankunde

Hi, @mridulm @otterc New processing flow:

  • Send all the finalize RPC tasks and wait for merged status in a new thread pool.
  • Wait for shuffleMergeResultsTimeoutSec in main thread if registerMergeResults = true, and then cancel all sending rpc tasks
  • Wait for shuffleMergeResultsTimeoutSec asynchronously if registerMergeResults = false, and then cancel all sending rpc tasks

Does this works?

wankunde avatar Aug 29 '22 06:08 wankunde

I'm sorry for the late reply, I have updated the code.

wankunde avatar Sep 09 '22 17:09 wankunde

+CC @otterc, @Ngone51 PTAL

mridulm avatar Sep 16 '22 22:09 mridulm

Merged to master. Thanks for working on this @wankunde ! Thanks for the review @otterc :-)

mridulm avatar Sep 23 '22 01:09 mridulm