fix:read inconsistent when local file read handler init before flushing event to disk
What changes were proposed in this pull request?
re init local file read handler when index file name is empty
Why are the changes needed?
some case will cause app to fail because of blocks read inconsistent
Does this PR introduce any user-facing change?
No.
How was this patch tested?
(Please test your changes, and provide instructions on how to test it:
- If you add a feature or fix a bug, add a test to cover your changes.
- If you fix a flaky test, repeat it for many times to prove it works.)
I don't see the root cause of this bug. @dingshun3016 Could you help add test case to simulate this or add more description
Test Results
2 363 files ±0 2 363 suites ±0 4h 30m 9s :stopwatch: - 1m 2s 912 tests ±0 911 :white_check_mark: ±0 1 :zzz: ±0 0 :x: ±0 10 585 runs ±0 10 571 :white_check_mark: ±0 14 :zzz: ±0 0 :x: ±0
Results for commit a7d86041. ± Comparison against base commit ed399553.
I don't see the root cause of this bug. @dingshun3016 Could you help add test case to simulate this or add more description
I post one case log in our production env.
First
Init LocalFileServerReadHandler, but no index files and data files in application_1711720273776_2156538_-779157491-37794/0/1176-1176
[INFO] 2024-04-02 14:23:13,880 Grpc-116 LocalFileServerReadHandler prepareFilePath - index files not find, baseFolder is /uniffle_data/application_1711720273776_2156538_-779157491-37794/0/1176-1176, appId application_1711720273776_2156538_-779157491-37794 shuffleId 0 partitionId 1176 partitionNumPerRange 1 partitionNum 2048 storageBasePath /uniffle_data
[INFO] 2024-04-02 14:23:13,880 Grpc-116 ShuffleServerGrpcService getLocalShuffleIndex - Successfully getShuffleIndex cost 0 ms for 0 bytes with appId[application_1711720273776_2156538_-779157491-37794], shuffleId[0], partitionId[1176]
Second Shuffer server flush event from memory to local file
[DEBUG] 2024-04-02 14:25:51,696 LocalFileFlushEventThreadPool-42 ShuffleFlushManager processEvent - Flush to file success in 22 ms and release 6765262 bytes event ShuffleDataFlushEvent: eventId=31941657, appId=application_1711720273776_2156538_-779157491-37794, shuffleId=0, startPartition=1176, endPartition=1176, retryTimes=0, underStorage=LocalStorage, isPended=false
Third Because LocalFileServerReadHandler has been initialized in the first step and is same, so index file and data file is still empty, but in fact there are not empty in this directory at this time.
[INFO] 2024-04-02 14:26:24,636 Grpc-13 ShuffleServerGrpcService getLocalShuffleIndex - Successfully getShuffleIndex cost 0 ms for 0 bytes with appId[application_1711720273776_2156538_-779157491-37794], shuffleId[0], partitionId[1176]
App log
24/04/02 14:26:27 ERROR [Executor task launch worker for task 2507] Executor: Exception in task 1176.3 in stage 5.0 (TID 2507) org.apache.uniffle.common.exception.RssException: Blocks read inconsistent: expected 506 blocks, actual 459 blocks at org.apache.uniffle.common.util.RssUtils.checkProcessedBlockIds(RssUtils.java:375) at org.apache.uniffle.client.impl.ShuffleReadClientImpl.checkProcessedBlockIds(ShuffleReadClientImpl.java:279) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:131) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:297) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:47) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:47) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:748) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
Could you check the value of the metric total_dropped_event_num in every shuffle server? I encountered "Blocks read inconsistent" exception in https://github.com/apache/incubator-uniffle/issues/1620 when there are events being dropped.
Also I did not find the log you mentioned above "index files not find" in the latest code. Your code must be different.
Could you check the value of the metric
total_dropped_event_numin every shuffle server? I encountered "Blocks read inconsistent" exception in #1620 when there are events being dropped.Also I did not find the log you mentioned above "index files not find" in the latest code. Your code must be different.
"index files not find" was added when I was troubleshooting the problem.It seem not same problem as #1620, after I added these codes, this case no longer occurred.
Could you check the value of the metric
total_dropped_event_numin every shuffle server? I encountered "Blocks read inconsistent" exception in #1620 when there are events being dropped. Also I did not find the log you mentioned above "index files not find" in the latest code. Your code must be different."index files not find" was added when I was troubleshooting the problem.It seem not same problem as #1620, after I added these codes, this case no longer occurred.
What about the metric total_dropped_event_num. Can you check it again when this issue occurred.
Could you check the value of the metric
total_dropped_event_numin every shuffle server? I encountered "Blocks read inconsistent" exception in #1620 when there are events being dropped. Also I did not find the log you mentioned above "index files not find" in the latest code. Your code must be different."index files not find" was added when I was troubleshooting the problem.It seem not same problem as #1620, after I added these codes, this case no longer occurred.
What about the metric
total_dropped_event_num. Can you check it again when this issue occurred.
total_dropped_event_num at that time was 0.
BTW, our prod environment is based on version 0.8, so these two questions may differ
Can you provide a UT to reproduce this issue? That will be clear for us to figure out the root cause.
Thanks for adding detailed logs, I have understood your point, the root cause is to partition shuffle reader handler is cached. After the memory data is flushed into local file, the cached read handler don't recognized this case and ignore it.
But from my side, I don't think your code running in your producation env is the same with the current upstream master code. In current uniffle master code, if the indexFile is not found, it will throw exception, so the log may not the same.
Based the above suppose, I think we just to handle the map.computeIfAbsent(partitionKey, key -> newReadHandler(request)); carefully and you can throw exception if no index file.
Please create the issue to track this bug @dingshun3016
Do we still need this?
Thanks for adding detailed logs, I have understood your point, the root cause is to partition shuffle reader handler is cached. After the memory data is flushed into local file, the cached read handler don't recognized this case and ignore it.
But from my side, I don't think your code running in your producation env is the same with the current upstream master code. In current uniffle master code, if the indexFile is not found, it will throw exception, so the log may not the same.
Based the above suppose, I think we just to handle the
map.computeIfAbsent(partitionKey, key -> newReadHandler(request));carefully and you can throw exception if no index file.
After thinking about it, I think throwing exception may cause call getLocalShuffleData to fail. @zuston
ping @zuston @dingshun3016