incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[Bug] ShuffleReadClientImpl occurs StackOverflowError.

Open leixm opened this issue 1 year ago • 17 comments

Code of Conduct

Search before asking

  • [X] I have searched in the issues and found no similar issues.

Describe the bug

The following exception occurred in the four attempts of the same task,the probability of this kind of problem recurring is relatively low. About thousands of apps will have this type of exception in one app, causing the app attempt to retry, and every time this exception occurs, the app is also different.

org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:321)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:209)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:208)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
	at io.grpc.internal.RetriableStream.cancel(RetriableStream.java:493)
	at io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:528)
	at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:506)
	at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:317)
	at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:227)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:154)
	at org.apache.uniffle.proto.ShuffleServerGrpc$ShuffleServerBlockingStub.getMemoryShuffleData(ShuffleServerGrpc.java:768)
	at org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient.getInMemoryShuffleData(ShuffleServerGrpcClient.java:627)
	at org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler.readShuffleData(MemoryClientReadHandler.java:94)
	at org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler.readShuffleData(ComposedClientReadHandler.java:93)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:224)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:160)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
	at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:214)
        ......
        ......

Affects Version(s)

0.6.0

Uniffle Server Log Output

Corresponding ShuffleServer has been repeatedly printing the following log

`[INFO] 2023-06-04 20:42:40,023 Grpc-839 ShuffleServerGrpcService getMemoryShuffleData - Successfully getInMemoryShuffleData cost 0 ms with 318133 bytes shuffle data for appId[xxxxx], shuffleId[0], partitionId[12]`

Uniffle Engine Log Output

Before the exception is thrown, the following logs have been repeatedly appearing, indicating that the client has been requesting the GetInMemoryShuffleData interface of ShuffleServer.

`23/06/04 20:42:43 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxxxx:19999 for appId[xxxxx], shuffleId[0], partitionId[12] cost 2 ms`

Uniffle Server Configurations

rss.rpc.server.port 19999
rss.jetty.http.port 19998
rss.storage.basePath /aa/bb/cc,/aa1/bb/cc
rss.storage.type MEMORY_LOCALFILE_HDFS
rss.coordinator.quorum xxxxxxx
rss.server.buffer.capacity 40gb
rss.server.read.buffer.capacity 20gb
rss.server.flush.thread.alive 50
rss.server.flush.threadPool.size 4
rss.server.disk.capacity 1t
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
rss.server.commit.timeout 600000
rss.server.app.expired.withoutHeartbeat 300000
rss.server.buffer.flush.enabled true
rss.server.tags high_priority
rss.server.health.check.enable true
rss.server.multistorage.fallback.strategy.class org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy
rss.server.flush.cold.storage.threshold.size 50gb
rss.server.buffer.flush.threshold 64m
rss.server.max.concurrency.of.single.partition.writer 20
rss.server.huge-partition.size.threshold 50g
rss.server.huge-partition.memory.limit.ratio 0.2

Uniffle Engine Configurations

--conf spark.rss.client.shuffle.data.distribution.type LOCAL_ORDER

Additional context

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

leixm avatar Jun 05 '23 03:06 leixm

@jerqi @zuston Has this problem occurred in your production environment?

leixm avatar Jun 05 '23 03:06 leixm

@jerqi @zuston Has this problem occurred in your production environment?

I haven't seen this error

zuston avatar Jun 05 '23 03:06 zuston

I haven't seen this error, too.

jerqi avatar Jun 05 '23 07:06 jerqi

I think this exception will occur when duplicate blocks are sent. Assume that there are two blocks with equal blockIds in a ShuffleBuffer. Before flush occurs, getMemoryShuffleData will always be called and never end. As you can see from the log below, the first read is 5050999 bytes, subsequent reads are all 4625271 bytes, and the reading never ends. There should be duplicate blockIds.

23/10/10 03:25:44 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 1114 ms, get 5050999 bytes
23/10/10 03:25:44 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 236 ms, get 4625271 bytes
23/10/10 03:25:45 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 642 ms, get 4625271 bytes
23/10/10 03:25:46 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xxx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 815 ms, get 4625271 bytes
23/10/10 03:25:47 INFO grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from xx:19999 for appId[application_1695193790407_9195629_1696874216324], shuffleId[9], partitionId[252] cost 868 ms, get 4625271 bytes
......

leixm avatar Oct 10 '23 07:10 leixm

There are two blocks with blockId=3 in the picture. The first time getMemoryShuffledata will read all blocks, and the second time it will read from blockId=4 to the last block. This reciprocation will never end. ShuffleBuffer Image

leixm avatar Oct 10 '23 07:10 leixm

@jerqi @zuston What do you think?

leixm avatar Oct 10 '23 07:10 leixm

Sounds right.

zuston avatar Oct 10 '23 07:10 zuston

I think you are right.

jerqi avatar Oct 10 '23 07:10 jerqi

Do you have any optimization suggestions for this situation? @jerqi @zuston

leixm avatar Oct 11 '23 03:10 leixm

Block deduplication on the ShuffleServer side may have a greater impact on performance.

leixm avatar Oct 11 '23 03:10 leixm

Maybe we could add a new id like flushId to solve this problem.

jerqi avatar Oct 11 '23 06:10 jerqi

Using the composite block id? like (last-1_block_id, last_block_id) ?

zuston avatar Oct 11 '23 06:10 zuston

How about using skip list to store blocks in ShuffleBuffer?

xianjingfeng avatar Oct 12 '23 02:10 xianjingfeng

How about using skip list to store blocks in ShuffleBuffer?

Maybe we can't, we should guarantee that the order is immutable.

jerqi avatar Oct 12 '23 03:10 jerqi

How about using skip list to store blocks in ShuffleBuffer?

Maybe we can't, we should guarantee that the order is immutable.

Why we should guarantee that the order is immutable?

xianjingfeng avatar Oct 12 '23 03:10 xianjingfeng

@leixm @zuston I have discussed this issue with @jerqi offline. We think we can use skipList to store blocks in ShuffleBuffer, but the key of the skipList should be a self-increment id. Another reason to use skipList is that if we assign a lot of memory(1TB) to the shuffle server, it will be slow while getting memory data with current implementation.

xianjingfeng avatar Nov 02 '23 07:11 xianjingfeng

@jerqi @zuston Has this problem occurred in your production environment?

We see this Exception before on prd. Later on prd, never see the Exception.

lifeSo avatar Nov 02 '23 08:11 lifeSo