incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

fix:read inconsistent when local file read handler init before flushing event to disk

Open dingshun3016 opened this issue 1 year ago • 13 comments

What changes were proposed in this pull request?

re init local file read handler when index file name is empty

Why are the changes needed?

some case will cause app to fail because of blocks read inconsistent

Does this PR introduce any user-facing change?

No.

How was this patch tested?

(Please test your changes, and provide instructions on how to test it:

  1. If you add a feature or fix a bug, add a test to cover your changes.
  2. If you fix a flaky test, repeat it for many times to prove it works.)

dingshun3016 avatar Apr 10 '24 08:04 dingshun3016

I don't see the root cause of this bug. @dingshun3016 Could you help add test case to simulate this or add more description

zuston avatar Apr 10 '24 08:04 zuston

Test Results

 2 363 files  ±0   2 363 suites  ±0   4h 30m 9s :stopwatch: - 1m 2s    912 tests ±0     911 :white_check_mark: ±0   1 :zzz: ±0  0 :x: ±0  10 585 runs  ±0  10 571 :white_check_mark: ±0  14 :zzz: ±0  0 :x: ±0 

Results for commit a7d86041. ± Comparison against base commit ed399553.

github-actions[bot] avatar Apr 10 '24 08:04 github-actions[bot]

I don't see the root cause of this bug. @dingshun3016 Could you help add test case to simulate this or add more description

I post one case log in our production env. First Init LocalFileServerReadHandler, but no index files and data files in application_1711720273776_2156538_-779157491-37794/0/1176-1176

[INFO] 2024-04-02 14:23:13,880 Grpc-116 LocalFileServerReadHandler prepareFilePath - index files not find, baseFolder is /uniffle_data/application_1711720273776_2156538_-779157491-37794/0/1176-1176, appId application_1711720273776_2156538_-779157491-37794 shuffleId 0 partitionId 1176 partitionNumPerRange 1 partitionNum 2048 storageBasePath /uniffle_data

[INFO] 2024-04-02 14:23:13,880 Grpc-116 ShuffleServerGrpcService getLocalShuffleIndex - Successfully getShuffleIndex cost 0 ms for 0 bytes with appId[application_1711720273776_2156538_-779157491-37794], shuffleId[0], partitionId[1176]

Second Shuffer server flush event from memory to local file

[DEBUG] 2024-04-02 14:25:51,696 LocalFileFlushEventThreadPool-42 ShuffleFlushManager processEvent - Flush to file success in 22 ms and release 6765262 bytes event ShuffleDataFlushEvent: eventId=31941657, appId=application_1711720273776_2156538_-779157491-37794, shuffleId=0, startPartition=1176, endPartition=1176, retryTimes=0, underStorage=LocalStorage, isPended=false

Third Because LocalFileServerReadHandler has been initialized in the first step and is same, so index file and data file is still empty, but in fact there are not empty in this directory at this time.

[INFO] 2024-04-02 14:26:24,636 Grpc-13 ShuffleServerGrpcService getLocalShuffleIndex - Successfully getShuffleIndex cost 0 ms for 0 bytes with appId[application_1711720273776_2156538_-779157491-37794], shuffleId[0], partitionId[1176]

App log

24/04/02 14:26:27 ERROR [Executor task launch worker for task 2507] Executor: Exception in task 1176.3 in stage 5.0 (TID 2507) org.apache.uniffle.common.exception.RssException: Blocks read inconsistent: expected 506 blocks, actual 459 blocks at org.apache.uniffle.common.util.RssUtils.checkProcessedBlockIds(RssUtils.java:375) at org.apache.uniffle.client.impl.ShuffleReadClientImpl.checkProcessedBlockIds(ShuffleReadClientImpl.java:279) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:131) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:297) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:47) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:47) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:748) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)

dingshun3016 avatar Apr 10 '24 10:04 dingshun3016

Could you check the value of the metric total_dropped_event_num in every shuffle server? I encountered "Blocks read inconsistent" exception in https://github.com/apache/incubator-uniffle/issues/1620 when there are events being dropped.

Also I did not find the log you mentioned above "index files not find" in the latest code. Your code must be different.

rickyma avatar Apr 10 '24 11:04 rickyma

Could you check the value of the metric total_dropped_event_num in every shuffle server? I encountered "Blocks read inconsistent" exception in #1620 when there are events being dropped.

Also I did not find the log you mentioned above "index files not find" in the latest code. Your code must be different.

"index files not find" was added when I was troubleshooting the problem.It seem not same problem as #1620, after I added these codes, this case no longer occurred.

dingshun3016 avatar Apr 10 '24 12:04 dingshun3016

Could you check the value of the metric total_dropped_event_num in every shuffle server? I encountered "Blocks read inconsistent" exception in #1620 when there are events being dropped. Also I did not find the log you mentioned above "index files not find" in the latest code. Your code must be different.

"index files not find" was added when I was troubleshooting the problem.It seem not same problem as #1620, after I added these codes, this case no longer occurred.

What about the metric total_dropped_event_num. Can you check it again when this issue occurred.

rickyma avatar Apr 10 '24 13:04 rickyma

Could you check the value of the metric total_dropped_event_num in every shuffle server? I encountered "Blocks read inconsistent" exception in #1620 when there are events being dropped. Also I did not find the log you mentioned above "index files not find" in the latest code. Your code must be different.

"index files not find" was added when I was troubleshooting the problem.It seem not same problem as #1620, after I added these codes, this case no longer occurred.

What about the metric total_dropped_event_num. Can you check it again when this issue occurred.

total_dropped_event_num at that time was 0. BTW, our prod environment is based on version 0.8, so these two questions may differ

dingshun3016 avatar Apr 11 '24 02:04 dingshun3016

Can you provide a UT to reproduce this issue? That will be clear for us to figure out the root cause.

rickyma avatar Apr 11 '24 03:04 rickyma

Thanks for adding detailed logs, I have understood your point, the root cause is to partition shuffle reader handler is cached. After the memory data is flushed into local file, the cached read handler don't recognized this case and ignore it.

But from my side, I don't think your code running in your producation env is the same with the current upstream master code. In current uniffle master code, if the indexFile is not found, it will throw exception, so the log may not the same.

Based the above suppose, I think we just to handle the map.computeIfAbsent(partitionKey, key -> newReadHandler(request)); carefully and you can throw exception if no index file.

zuston avatar Apr 11 '24 03:04 zuston

Please create the issue to track this bug @dingshun3016

zuston avatar Apr 11 '24 03:04 zuston

Do we still need this?

rickyma avatar Apr 22 '24 11:04 rickyma

Thanks for adding detailed logs, I have understood your point, the root cause is to partition shuffle reader handler is cached. After the memory data is flushed into local file, the cached read handler don't recognized this case and ignore it.

But from my side, I don't think your code running in your producation env is the same with the current upstream master code. In current uniffle master code, if the indexFile is not found, it will throw exception, so the log may not the same.

Based the above suppose, I think we just to handle the map.computeIfAbsent(partitionKey, key -> newReadHandler(request)); carefully and you can throw exception if no index file.

After thinking about it, I think throwing exception may cause call getLocalShuffleData to fail. @zuston

dingshun3016 avatar Apr 23 '24 06:04 dingshun3016

ping @zuston @dingshun3016

rickyma avatar Jun 19 '24 06:06 rickyma