incubator-uniffle
incubator-uniffle copied to clipboard
[Bug] Under high pressure situations, reading data may encounter the issue of "Blocks read inconsistent"
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Search before asking
- [X] I have searched in the issues and found no similar issues.
Describe the bug
[19:30:48:677] [Executor task launch worker for task 15111.2 in stage 4.0 (TID 83474)] ERROR org.apache.spark.executor.Executor.logError:98 - Exception in task 15111.2 in stage 4.0 (TID 83474)
org.apache.uniffle.common.exception.RssException: Blocks read inconsistent: expected 23357 blocks, actual 19597 blocks
at org.apache.uniffle.common.util.RssUtils.checkProcessedBlockIds(RssUtils.java:382) ~[rss-client-spark3-shaded-0.9.0-SNAPSHOT.jar:?]
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.checkProcessedBlockIds(ShuffleReadClientImpl.java:321) ~[rss-client-spark3-shaded-0.9.0-SNAPSHOT.jar:?]
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:131) ~[rss-client-spark3-shaded-0.9.0-SNAPSHOT.jar:?]
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:317) ~[rss-client-spark3-shaded-0.9.0-SNAPSHOT.jar:?]
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hashAgg_doAggregateWithoutKey_0$(Unknown Source) ~[?:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) ~[spark-sql_2.12-3.3.1.jar:3.3.1]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.shuffle.writer.RssShuffleWriter.writeImpl(RssShuffleWriter.java:266) ~[rss-client-spark3-shaded-0.9.0-SNAPSHOT.jar:?]
at org.apache.spark.shuffle.writer.RssShuffleWriter.write(RssShuffleWriter.java:247) ~[rss-client-spark3-shaded-0.9.0-SNAPSHOT.jar:?]
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1556) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.jar:3.3.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_392]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_392]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392]
Affects Version(s)
master
Uniffle Server Log Output
No response
Uniffle Engine Log Output
No response
Uniffle Server Configurations
No response
Uniffle Engine Configurations
No response
Additional context
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
The root cause of this issue is https://github.com/apache/incubator-uniffle/issues/1626, which will be fixed by https://github.com/apache/incubator-uniffle/pull/1627.
And I think maybe we can improve this: when there are flush events being dropped(We need to distinguish a scenario: if it's a speculative write task, we allow them to be dropped), we can fast fail when reading data, instead of lagging until throwing "Blocks read inconsistent" exceptions.