spark
spark copied to clipboard
[SPARK-48580][CORE] Add consistency check and fallback for mapIds in push-merged block meta
What changes were proposed in this pull request?
Add consistency check for mapIds between the push-merged block meta from the server side and the mapTracker on the driver side for reduce tasks. If any mapIds is found missing in the chunk meta, fallback to fetching original shuffle blocks. This end-to-end check helps to avoid issues of data loss during the shuffle read phase when reduce tasks fetch merged data.
Why are the changes needed?
ShuffleBlockFetcherIterator initializes requests based on the mergeStatus and mapStatus from the driver side, where the mergeStatus's mapTracker (partition level bitmap) comes from the mapTracker maintained in the shuffle service's memory. but the actual mapIds for fetching chunk data come from the shuffle service's metaFile. There is no consistency check between the two. When the server encounters issues such as disk failures, it may cause inconsistencies in mapIds between the mapTracker and the metaFile. This ultimately results in data loss when reduce tasks fetch merged data.
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT
Was this patch authored or co-authored using generative AI tooling?
No
@mridulm @otterc Please help review, thanks in advance!
Thanks for contributing this @gaoyajun02 , I will circle back to this later this week.
@gaoyajun02 , trying to understand the scenario better here - did you observe disk issues which resulted in this inconsistency ?
If yes, should this be checksum'ed - to ensure correctness. I would prefer that to adding additional rpc calls to the driver - which will now be incurred for all calls - given this should be a rare enough scenario.
Thoughts ?
Also, +CC @zhouyejoe as well.
Service nodes with disk issues (e.g. No space left on device, Read-only file system) have a large number of logs stating IOExceptions exceeded the threshold when merging shufflePush, as well as the following WARN log:
INFO application_xxx attempt 1 shuffle 0 shuffleMerge 0: finalize shuffle merge
WARN Application application_xxxx shuffleId 0 shuffleMergeId 0 reduceId 133 update to index/meta failed
The corresponding method is specifically updateChunkInfo -> writeChunkTracker, which encounters an IOException when serializing chunkTracker to disk.
Based on this information, it can be determined that: during the finalize partition process(see: https://github.com/apache/spark/blob/80bba4463eba29a56cdd90642f0681c3710ce87c/common%2Fnetwork-shuffle%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fspark%2Fnetwork%2Fshuffle%2FRemoteBlockPushResolver.java#L1896-L1896), an io exception occurs when the serialized chunkTracker writes to the metaFile, but truncate metaFile succeeds, which means that the last chunkTracker is successfully deleted from the metafile. but the mapTracker has recorded the mapId after the block data processing is complete. This is a case where the metafile has fewer mapids than the maptracker
Additionally, I have also found scenarios where the mapTracker has fewer mapIds than the metaFile.
updateChunkInfo is also called after the block merge is completed. In this case (see code:
https://github.com/apache/spark/blob/80bba4463eba29a56cdd90642f0681c3710ce87c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L1571
), if updating the metaFile fails due to IOException: No space left on device, the mapTracker will not record the mapId, but the chunkTracker is not reset.
some logs:
2024-06-17 17:38:58,061 WARN shuffle-server-10-29 org.apache.spark.network.shuffle.RemoteBlockPushResolver: Application application_1694504975150_17223443_1 shuffleId 0 shuffleMergeId 0 reduceId 281 update to index/meta failed
2024-06-17 17:38:58,061 ERROR shuffle-server-10-29 org.apache.spark.network.shuffle.RemoteBlockPushResolver: Encountered issue when merging shufflePush_0_0_6959_281
java.io.IOException: Failure post-processing complete stream; failing this rpc and leaving channel active
at org.apache.spark.network.server.TransportRequestHandler$3.onComplete(TransportRequestHandler.java:227)
at org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:89)
at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: IOExceptions exceeded the threshold when merging shufflePush_0_0_6959_281
at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.abortIfNecessary(RemoteBlockPushResolver.java:1370)
at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.incrementIOExceptionsAndAbortIfNecessary(RemoteBlockPushResolver.java:1383)
at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.onComplete(RemoteBlockPushResolver.java:1550)
at org.apache.spark.network.server.TransportRequestHandler$3.onComplete(TransportRequestHandler.java:216)
... 19 more
When finalizePartition is executed later and disk space is restored, the metaFile update succeeds, but the mapTracker does not record the mapId.
The above scenario was collected and located through metrics in our production environment. However, there are still some scenarios that cannot explain the inconsistency of mapIds. At the application layer, there is no exception log on the server side, but these service nodes (only 0.1% of cluster nodes) can see frequent file system error printing in the system diagnosis logs. There is a small probability that some apps shuffle data inconsistency occurs on these machines every day. In this case, the PR cannot guarantee the final consistency of shuffle data. Considering these types of problems, my current solution is to fallback the merged data of the entire reduce partition instead of just the inconsistent mapids.
@gaoyajun02 , trying to understand the scenario better here - did you observe disk issues which resulted in this inconsistency ?
If yes, should this be checksum'ed - to ensure correctness. I would prefer that to adding additional rpc calls to the driver - which will now be incurred for all calls - given this should be a rare enough scenario.
Thoughts ?
Also, +CC @zhouyejoe as well.
The first two paragraphs are my descriptions of these scenarios.
The added getMergeStatusMapTracker call does not result in an additional RPC call to the driver during runtime, because after the first request for metadata, see: https://github.com/apache/spark/blob/80bba4463eba29a56cdd90642f0681c3710ce87c/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L135, the mergeStatus is already saved in the mergeStatuses of the Executor side's MapOutputTrackerWorker. Considering that some system-level errors cannot be fully covered, I think it is necessary to perform merged block meta verification and fallback on the reduce side. @mridulm
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!