incubator-uniffle
incubator-uniffle copied to clipboard
[Problem] Inconsistent blocks when reading shuffle data
I found some tasks of spark jobs will throw the exceptions that the inconsistent blocks number. The stacktrace is as follows
22/09/03 15:29:21 ERROR Executor: Exception in task 330.0 in stage 9.0 (TID 59001)
org.apache.uniffle.common.exception.RssException: Blocks read inconsistent: expected 30000 blocks, actual 15636 blocks
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.checkProcessedBlockIds(ShuffleReadClientImpl.java:215)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:135)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
I didn't find any error/warn log in shuffle server which stored the corresponding partition data.
We dont set any replica config and directly use the MEMORY_LOCALFILE storageType. Does this exception caused by the disk error?
It may be a bug of our rss. I ever try to fix the similar problems by https://github.com/apache/incubator-uniffle/pull/40 and https://github.com/Tencent/Firestorm/pull/92. Could you reproduce this problem? Are the missing blocks disk storage or memory storage?
Could you reproduce this problem?
It occurs in our users' jobs and maybe I have to write test spark code to reproduce
Are the missing blocks disk storage or memory storage?
I dont know which storage's block missing. But I check the request number in shuffle server and the request log entry number in tasks, they are equal.
It may be a bug of our rss. I ever try to fix the similar problems by https://github.com/apache/incubator-uniffle/pull/40 and https://github.com/Tencent/Firestorm/pull/92
It looks this problem is not the same as the problem you said. This problem is caused by lacking partial blocks.
22/09/05 20:58:01 INFO impl.HdfsShuffleReadHandler: Read index files hdfs://ns1/tmp/4/308-308/xxxx-19999_0.index for 2 ms
22/09/05 20:58:01 WARN util.RssUtils: Read index data under flow
java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:506)
at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:412)
at org.apache.uniffle.common.util.RssUtils.transIndexDataToSegments(RssUtils.java:204)
at org.apache.uniffle.common.util.RssUtils.transIndexDataToSegments(RssUtils.java:188)
at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:67)
at org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler.readShuffleData(HdfsClientReadHandler.java:135)
at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:112)
at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:140)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:195)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:131)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:100)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:213)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
We seem to have the same problem, but the reason for the inconsistent blocks number is that an exception occurred when the index file was read.
transIndexDataToSegments
It seems that we read incomplete index file.
transIndexDataToSegments
It seems that we read incomplete index file.
We should fail fast when encountering BufferUnderflowException instead of ignoring. https://github.com/apache/incubator-uniffle/blob/07f70ed872b87107fc7028577bd9e66d8349fd6c/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java#L224
For MEMORY_HDFS or MEMORY_LOCALFILE, it's normal to read incomplete index data, so we choose to ignore instead of failing fast.
We should fail fast when encountering BufferUnderflowException instead of ignoring.
What I am curious about is whether this is caused by the fact that the data of the stream is not flushed normally or the stream is closed abnormally.
We should fail fast when encountering BufferUnderflowException instead of ignoring.
What I am curious about is whether this is caused by the fact that the data of the stream is not flushed normally or the stream is closed abnormally.
Maybe shuffle index is writing..
For MEMORY_HDFS or MEMORY_LOCALFILE, it's normal to read incomplete index data, so we choose to ignore instead of failing fast.
Oh, make sense.
We should fail fast when encountering BufferUnderflowException instead of ignoring.
What I am curious about is whether this is caused by the fact that the data of the stream is not flushed normally or the stream is closed abnormally.
Maybe shuffle index is writing..
Firstly, maybe we should reproduce this exception that make reader/writer operate this file at the same time.
Revisit: do we need to fail fast?
As I know, the exception of BufferUnderflowException is caused by the incomplete FileBasedShuffleSegment
. In localfile reader, it will always get the n pieces of FileBasedShuffleSegment
and then wont throw the exception. However in HDFS index read, it may return the incomplete data ((this problem will happen when spill data from memory to HDFS, so we also should do similar operation like local index file read). After that, we should do fail fast in https://github.com/apache/incubator-uniffle/blob/07f70ed872b87107fc7028577bd9e66d8349fd6c/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java#L224
Is inconsistent blocks caused by this
BufferUnderflowException
?
The operation of spilling is a common action for uniffle and it has done the operation of filtering blocks. So this inconsistent blocks means it exist bug when spilling
Our production environment also had such an exception once, but so far it has not been able to reproduce.
I think maybe the shuffle server is writing this file and the output stream has not been closed.
Can you help me understand #issue_63? @jerqi
When we use the storageType MEMORY_HDFS, MEMORY_LOCALFILE_HDFS, after we read data from memory, the data may be flushed to HDFS. So we could read incomplete index data.
org.apache.uniffle.storage.handler.impl.HdfsFileWriter#writeIndex
is a complete operation, it looks like it shouldn't write an incomplete index file.
In our production environment, certain tasks will have this problem for a certain period of time. I am trying to reproduce this situation and keep the index file and data file.
Can you help me understand #issue_63? @jerqi
When we use the storageType MEMORY_HDFS, MEMORY_LOCALFILE_HDFS, after we read data from memory, the data may be flushed to HDFS. So we could read incomplete index data.
org.apache.uniffle.storage.handler.impl.HdfsFileWriter#writeIndex
is a complete operation, it looks like it shouldn't write an incomplete index file.
If you use MEMORY_HDFS or MEMORY_LOCALFILE_HDFS, you can read data from memory and then you will read data from HDFS, but the data of memory may be flushed, so you can read the incomplete data of memory flushed to HDFS.
After collecting all the failed tasks whose exception is related to org.apache.uniffle.common.exception.RssException: Blocks read inconsistent
, I found these exception is caused by the DEADLINE_EXCEEDED
of GRPC remote call, which is caught by the Uniffle and dont throw it. This made us confused and thought it was a w/r bug.
I think I will raise a PR to fix it.
Follow up this problem.
I found the Grpc client sometimes will throw DEADLINE exception like as follows
org.apache.uniffle.common.exception.RssException: Failed to read shuffle data with ShuffleServerGrpcClient for host[10.67.67.68], port[21000] due to DEADLINE_EXCEEDED: deadline exceeded after 59.999946594s. [closed=[], committed=[remote_addr=10.67.67.68/10.67.67.68:21000]]
at org.apache.uniffle.storage.handler.impl.LocalFileClientRemoteReadHandler.readShuffleData(LocalFileClientRemoteReadHandler.java:88)
at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:83)
at org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:79)
at org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler.readShuffleData(LocalFileQuorumClientReadHandler.java:79)
at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:112)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:195)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:131)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:101)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:238)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
at org.apache.spark.sql.execution.joins.SortMergeFullOuterJoinScanner.advancedRight(SortMergeJoinExec.scala:1000)
at org.apache.spark.sql.execution.joins.SortMergeFullOuterJoinScanner.<init>(SortMergeJoinExec.scala:975)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:220)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
22/09/13 10:43:48 ERROR ComposedClientReadHandler: Failed to read shuffle data from WARM handler
But I found this response has been sent by shuffle server, but the client side still throw exception. What will cause this? Network? GC?
Did you meet similar problems? @jerqi
Follow up this problem.
I found the Grpc client sometimes will throw DEADLINE exception like as follows
org.apache.uniffle.common.exception.RssException: Failed to read shuffle data with ShuffleServerGrpcClient for host[10.67.67.68], port[21000] due to DEADLINE_EXCEEDED: deadline exceeded after 59.999946594s. [closed=[], committed=[remote_addr=10.67.67.68/10.67.67.68:21000]] at org.apache.uniffle.storage.handler.impl.LocalFileClientRemoteReadHandler.readShuffleData(LocalFileClientRemoteReadHandler.java:88) at org.apache.uniffle.storage.handler.impl.DataSkippableReadHandler.readShuffleData(DataSkippableReadHandler.java:83) at org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:79) at org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler.readShuffleData(LocalFileQuorumClientReadHandler.java:79) at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:112) at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:195) at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:131) at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:101) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:238) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) at org.apache.spark.sql.execution.joins.SortMergeFullOuterJoinScanner.advancedRight(SortMergeJoinExec.scala:1000) at org.apache.spark.sql.execution.joins.SortMergeFullOuterJoinScanner.<init>(SortMergeJoinExec.scala:975) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:220) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 22/09/13 10:43:48 ERROR ComposedClientReadHandler: Failed to read shuffle data from WARM handler
But I found this response has been sent by shuffle server, but the client side still throw exception. What will cause this? Network? GC?
Did you meet similar problems? @jerqi
Response may not be sent by shuffle server timely.
Response may not be sent by shuffle server timely.
Do we have some ways to avoid this problem?
Response may not be sent by shuffle server timely.
Do we have some ways to avoid this problem?
This is the response timeout, which should be caused by the high load of ShuffleServer and slow response.
We can increase the rpc timeout, and find the reason for the slow response of the shuffle server.
Response may not be sent by shuffle server timely.
Do we have some ways to avoid this problem?
This is the response timeout, which should be caused by the high load of ShuffleServer and slow response.
But I dont find any exception through the metrics of grpc. How did u locate this problem caused by the high-pressure shuffle server?
We can increase the rpc timeout, and find the reason for the slow response of the shuffle server.
It's not a fundamental way of increasing the rpc timeout. And I found the rpc has been accepted by shuffle server(handled log has been shown), maybe it's stucked on sending.
Response may not be sent by shuffle server timely.
Do we have some ways to avoid this problem?
This is the response timeout, which should be caused by the high load of ShuffleServer and slow response.
But I dont find any exception through the metrics of grpc. How did u locate this problem caused by the high-pressure shuffle server?
We had a similar rpc timeout exception before, and the task of running 10T data will appear. The investigation found that it was because the inflush_memory and used_memory were too high, which caused the client to frequently retry to send data and apply for buffer.
We can increase the rpc timeout, and find the reason for the slow response of the shuffle server.
It's not a fundamental way of increasing the rpc timeout. And I found the rpc has been accepted by shuffle server(handled log has been shown), maybe it's stucked on sending.
You're right, not only the service response, but also the network io, etc.
The investigation found that it was because the inflush_memory and used_memory were too high, which caused the client to frequently retry to send data and apply for buffer.
Yes, the failure of requiring memory will also cause this exception, but I didn't find any exception of server about require-memory failed.
Besides, the problem you mentioned I have submit a new PR to fix #218
we also met this error, but in fact it's rpc timeout, when we in compose client mode, the one time read exception will be catch, then try to read other storage , and if the result is empty, then the exception will be gone. at last client will check the blocks map, found the Inconsistent blocks issue. so am thinking there is exception confuse, some case is not inconsistent blocks issue, just read timeout maybe cased by high load shuffle server. suggest to change the code to raise another exception for this case to void the confuse
suggest to change the code to raise another exception for this case to void the confuse
+1.
we also met this error, but in fact it's rpc timeout, when we in compose client mode, the one time read exception will be catch, then try to read other storage , and if the result is empty, then the exception will be gone. at last client will check the blocks map, found the Inconsistent blocks issue. so am thinking there is exception confuse, some case is not inconsistent blocks issue, just read timeout maybe cased by high load shuffle server. suggest to change the code to raise another exception for this case to void the confuse
+1, you can raise a pr if you have time.