spark
spark copied to clipboard
[SPARK-52395][CORE] Fast fail when shuffle fetch failure happens
What changes were proposed in this pull request?
Currently for shuffle reading, ShuffleBlockFetcherIterator will fetch local and host local blocks in task thread and send remote block fetch request in parallel, and the fetch result including FailureFetchResult will be added to a LinkedBlockingQueue to be consumed in FIFO mode.
This means we have to process all the successfully fetched blocks and then fail the task when found a FailureFetchResult, it could take long time in some cases when there are many successfully blocks or some big ones in the front of the result queue with time consuming operations on these data.
This PR proposes to fail the shuffle read task as early as possible when fetch failure happens by:
- Use
LinkedBlockingDequeinstead ofLinkedBlockingQueuefor the results. - Add
FailureFetchResultto the head of the results queue.
Regarding to performance impact by changing to LinkedBlockingDeque, it might be not a big concern here. As LinkedBlockingDeque could be less efficient in scenarios with extremely high concurrency and frequent lock contention, but shuffle reader is not such case.
Why are the changes needed?
Fast fail when fetch failures happen in shuffle reader to avoid taking time processing the previous fetched shuffle blocks as the task would fail anyway.
Does this PR introduce any user-facing change?
No
How was this patch tested?
UTs
Was this patch authored or co-authored using generative AI tooling?
No
cc @Ngone51 @jiangxb1987 @cloud-fan Please take a look. Thanks.
cc @mridulm @attilapiros
Use LinkedBlockingDeque instead of LinkedBlockingQueue for the results.
Another possible solution to consider would be collecting the FailureFetchResult in a separate collection (which can be LinkedBlockingQueue) and use this new collection first at next() method.
WDYT?
Use LinkedBlockingDeque instead of LinkedBlockingQueue for the results.
Another possible solution to consider would be collecting the
FailureFetchResultin a separate collection (which can beLinkedBlockingQueue) and use this new collection first atnext()method.WDYT?
Thanks @attilapiros
I've thought about this, one concern about having 2 collections is that it may lead to some race condition issues such as:
- Checking failure results -> Empty
- Failure added
- Take results from the success queue which might never have new elements added.
Dealing with the race conditions might introduce some complexity here, so this PR proposed to use LinkedBlockingDeque instead.
Dealing with the race conditions might introduce some complexity here
Right, with two collections this would be lot more complex.
Why we need the feature flag? I fail to see what we lose when this is always on but I can see the flag adds some extra complexity.
Hi @attilapiros, adding feature flag is trying to keep a mitigation for any potential unforeseen issues since shuffle is a critical component and we've made slightly behavior changes in the shuffle fetch iterator.
We already has too many config parameters and I cannot see any risk here.
@LuciferYang, @jiangxb1987 WDYT?
It indeed seems that there are no apparent risks. I agree to remove the newly added config.
Thanks for sharing your opinions. Removing the feature flag. cc @attilapiros @LuciferYang @jiangxb1987
Hi @attilapiros pls take another look. Thanks.
Merged into master for Apache Spark 4.1.0. Thanks @ivoson @attilapiros @Ngone51 @jiangxb1987 and @mridulm
Thanks for your review @LuciferYang @jiangxb1987 @Ngone51 @attilapiros @mridulm