incubator-uniffle
incubator-uniffle copied to clipboard
[Bug] ShuffleReadClientImpl occurs StackOverflowError.
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Search before asking
- [X] I have searched in the issues and found no similar issues.
Describe the bug
The following exception occurred in the four attempts of the same task,the probability of this kind of problem recurring is relatively low. About thousands of apps will have this type of exception in one app, causing the app attempt to retry, and every time this exception occurs, the app is also different.
org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:321)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:209)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:208)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
at io.grpc.internal.RetriableStream.cancel(RetriableStream.java:493)
at io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:528)
at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:506)
at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:317)
at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:227)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:154)
at org.apache.uniffle.proto.ShuffleServerGrpc$ShuffleServerBlockingStub.getMemoryShuffleData(ShuffleServerGrpc.java:768)
at org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient.getInMemoryShuffleData(ShuffleServerGrpcClient.java:627)
at org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler.readShuffleData(MemoryClientReadHandler.java:94)
at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:93)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:224)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:160)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
......
......
Affects Version(s)
0.6.0
Uniffle Server Log Output
Corresponding ShuffleServer has been repeatedly printing the following log
`[INFO] 2023-06-04 20:42:40,023 Grpc-839 ShuffleServerGrpcService getMemoryShuffleData - Successfully getInMemoryShuffleData cost 0 ms with 318133 bytes shuffle data for appId[xxxxx], shuffleId[0], partitionId[12]`
Uniffle Engine Log Output
Before the exception is thrown, the following logs have been repeatedly appearing, indicating that the client has been requesting the GetInMemoryShuffleData interface of ShuffleServer.
`23/06/04 20:42:43 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxxxx:19999 for appId[xxxxx], shuffleId[0], partitionId[12] cost 2 ms`
Uniffle Server Configurations
rss.rpc.server.port 19999
rss.jetty.http.port 19998
rss.storage.basePath /aa/bb/cc,/aa1/bb/cc
rss.storage.type MEMORY_LOCALFILE_HDFS
rss.coordinator.quorum xxxxxxx
rss.server.buffer.capacity 40gb
rss.server.read.buffer.capacity 20gb
rss.server.flush.thread.alive 50
rss.server.flush.threadPool.size 4
rss.server.disk.capacity 1t
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
rss.server.commit.timeout 600000
rss.server.app.expired.withoutHeartbeat 300000
rss.server.buffer.flush.enabled true
rss.server.tags high_priority
rss.server.health.check.enable true
rss.server.multistorage.fallback.strategy.class org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy
rss.server.flush.cold.storage.threshold.size 50gb
rss.server.buffer.flush.threshold 64m
rss.server.max.concurrency.of.single.partition.writer 20
rss.server.huge-partition.size.threshold 50g
rss.server.huge-partition.memory.limit.ratio 0.2
Uniffle Engine Configurations
--conf spark.rss.client.shuffle.data.distribution.type LOCAL_ORDER
Additional context
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
@jerqi @zuston Has this problem occurred in your production environment?
@jerqi @zuston Has this problem occurred in your production environment?
I haven't seen this error
I haven't seen this error, too.
I think this exception will occur when duplicate blocks are sent. Assume that there are two blocks with equal blockIds in a ShuffleBuffer. Before flush occurs, getMemoryShuffleData will always be called and never end. As you can see from the log below, the first read is 5050999 bytes, subsequent reads are all 4625271 bytes, and the reading never ends. There should be duplicate blockIds.
23/10/10 03:25:44 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 1114 ms, get 5050999 bytes
23/10/10 03:25:44 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 236 ms, get 4625271 bytes
23/10/10 03:25:45 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 642 ms, get 4625271 bytes
23/10/10 03:25:46 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 815 ms, get 4625271 bytes
23/10/10 03:25:47 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 868 ms, get 4625271 bytes
......
There are two blocks with blockId=3 in the picture. The first time getMemoryShuffledata will read all blocks, and the second time it will read from blockId=4 to the last block. This reciprocation will never end. ShuffleBuffer Image
@jerqi @zuston What do you think?
Sounds right.
I think you are right.
Do you have any optimization suggestions for this situation? @jerqi @zuston
Block deduplication on the ShuffleServer side may have a greater impact on performance.
Maybe we could add a new id like flushId to solve this problem.
Using the composite block id? like (last-1_block_id, last_block_id) ?
How about using skip list to store blocks in ShuffleBuffer
?
How about using skip list to store blocks in
ShuffleBuffer
?
Maybe we can't, we should guarantee that the order is immutable.
How about using skip list to store blocks in
ShuffleBuffer
?Maybe we can't, we should guarantee that the order is immutable.
Why we should guarantee that the order is immutable?
@leixm @zuston
I have discussed this issue with @jerqi offline. We think we can use skipList to store blocks in ShuffleBuffer
, but the key of the skipList should be a self-increment id.
Another reason to use skipList is that if we assign a lot of memory(1TB) to the shuffle server, it will be slow while getting memory data with current implementation.
@jerqi @zuston Has this problem occurred in your production environment?
We see this Exception before on prd. Later on prd, never see the Exception.