incubator-uniffle
incubator-uniffle copied to clipboard
[Bug] spark executor task error when reading shuffle data when using java open jdk11
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Search before asking
- [X] I have searched in the issues and found no similar issues.
Describe the bug
I use openjdk 11 in spark image, and I get errors when spark task reading shuffle data from uniffle server, here is the executor task error log:
Affects Version(s)
0.9.0
Uniffle Server Log Output
no error message
Uniffle Engine Log Output
java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
at org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:314)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:216)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:307)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.isEmpty(Iterator.scala:387)
at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
at scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
at scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1559)
at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1558)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: org.apache.spark.util.TaskCompletionListenerException: null
Previous exception in task: null
java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.base/java.lang.reflect.Method.invoke(Unknown Source)
org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:314)
org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:216)
org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:307)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithKeys_0$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
scala.collection.Iterator.isEmpty(Iterator.scala:387)
scala.collection.Iterator.isEmpty$(Iterator.scala:387)
scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1559)
org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1558)
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
org.apache.spark.scheduler.Task.run(Task.scala:141)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.base/java.lang.Thread.run(Unknown Source)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:177)
... 9 more
Suppressed: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
at org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:217)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:284)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$1(RssShuffleReader.java:296)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
... 12 more
Caused by: java.lang.IllegalArgumentException: duplicate or slice
at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unknown Source)
... 27 more
Caused by: java.lang.IllegalArgumentException: duplicate or slice
at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unknown Source)
... 42 more
Uniffle Server Configurations
rss.rpc.server.type GRPC_NETTY
...
Uniffle Engine Configurations
spark.rss.client.type=GRPC_NETTY
spark.rss.client.netty.io.mode=EPOLL
spark.rss.storage.type=MEMORY_LOCALFILE
...
Additional context
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
@rickyma @jerqi Does the client side support jdk11?
@rickyma @jerqi Does the client side support jdk11?
We didn't test it. From the error, it seems that we don't use proper method to release memory for JDK11.
@rickyma @jerqi Does the client side support jdk11?
We didn't test it. From the error, it seems that we don't use proper method to release memory for JDK11.
How should I fix this error for JDK11?
@rickyma Do you have any suggestion?
I've no idea. Could you please test this case using JDK 11? @maobaolong
@rickyma We use jdk8 for all RSS cluster and client, so we did not encounter this issue for production env.
But I did a test on JDK11 just konw, this issue reproduced.
➜ spark.mbl bin/spark-shell --master spark://localhost:7077 --deploy-mode client --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager --conf spark.rss.coordinator.quorum=localhost:19999 --conf spark.rss.storage.type=LOCALFILE --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.rss.test.mode.enable=true --conf spark.rss.client.type=GRPC_NETTY --conf spark.sql.shuffle.partitions=1 -i test.scala
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/softwares/spark.mbl/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/softwares/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
24/09/06 11:41:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/06 11:41:46 WARN RssSparkShuffleUtils: Empty conf items
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20240906114147-0002).
Spark session available as 'spark'.
24/09/06 11:41:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2) (localhost executor 0): java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
... 29 more
24/09/06 11:41:58 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8) (9.134.219.180 executor 0): java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
... 28 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2687)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2623)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2622)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2622)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2880)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2822)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2811)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2349)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2370)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2389)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
... 65 elided
Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumentException: duplicate or slice
at java.base/jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
... 28 more
- test.scala
val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("A", 6), ("A", 7),("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7)));
val result = data.reduceByKey(_ + _);
result.collect().foreach(println);
System.exit(0);
Could you help to solve this? It will block people from using JDK 11. @maobaolong
@rickyma @ChenRussell JDK11 cannot works with uniffle client for now.
@rickyma @ChenRussell JDK8 cannot works with uniffle client for now.
Do you mean JDK11?
Do you mean JDK11?
Sorry for the mistake, yeah, i mean JDK11, I got this conclusion from the community meeting.
@advancedxy @LuciferYang Do you have any suggestion about this issue?
Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.
I've been running into the same issue. After searching around online I found this. And while it seems like a dirty hack, that project is wide-spread enough to hopefully catch this sort of thing. I replaced RssUtils.java:426 with the following, and it works:
PlatformDependent.freeDirectBuffer(byteBuffer);
} catch (Throwable e) {
Throwable cause = e.getCause();
if (!(cause instanceof IllegalArgumentException)) {
throw e;
}
// Buffer is duplicate or slice
}
Not sure how to check for memory leaks right now though.
@ChenRussell Have you tried --add-opens java.base/jdk.internal.ref=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true
@ChenRussell Have you tried
--add-opens java.base/jdk.internal.ref=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true
Yes, I have tried, that's not working, it's not the accessible problem, it can access the reflection method
@ChenRussell Got it ~ Is there an existing unit test (UT) that can reproduce this issue? Java 8 uses CleanerJava6, while Java 9+ uses CleanerJava9, and there are some differences in the code implementation. It would be easier to identify the root cause if there is a UT that can reproduce it.
@ChenRussell Got it ~ Is there an existing unit test (UT) that can reproduce this issue? Java 8 uses
CleanerJava6, while Java 9+ usesCleanerJava9, and there are some differences in the code implementation. It would be easier to identify the root cause if there is a UT that can reproduce it.
You can use this https://github.com/apache/incubator-uniffle/issues/2082#issuecomment-2333178521 to reproduce this issue.
Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.
agree. The root cause is that ByteBuffer#attachment is unexpectedly null.
Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.
agree. The root cause is that ByteBuffer#attachment is unexpectedly null.
Can we assess if ByteBuffer#attachment is non-null before releasing memory as a temporary fix?
Similar problems arise here, such as ARTEMIS-2131 2378 WFLY-11026 , We can refer to it
It seems to be caused by this code, a result byteBuf will be a silced ByteBuf, but a silced Buffer byteBuffer = byteBuf.nioBuffer() not support release. @advancedxy @ChenRussell
https://github.com/apache/incubator-uniffle/blob/b7c9ca59b1413fa480fa0cfec666e9d15d77dcf9/common/src/main/java/org/apache/uniffle/common/netty/TransportFrameDecoder.java#L179
Maybe we could remove
if (readBuffer != null) {
RssUtils.releaseByteBuffer(readBuffer);
}
Because we have released the ByteBuf
if (sdr != null) {
sdr.release();
// We set sdr to null here to prevent IllegalReferenceCountException that could occur
// if sdr.release() is called multiple times in the close() method,
// when an exception is thrown by clientReadHandler.readShuffleData().
sdr = null;
}
cc @rickyma @maobaolong @leixm WDYT?
I also hit the same issue. In my case, readBuffer is an instance of DirectByteBuffer and readBuffer#cleaner is null and readBuffer#attachment is not null. Since readBuffer#cleaner is null, PlatformDependent.freeDirectBuffer which eventually invokes cleaner won't do anything regardless, plus before reaching cleaner, it's already error out when checking attachment. The readBuffer we get is from DirectByteBuffer.duplicate, which sets cleaner to null intentionally. It seems readBuffer is just a pointer to the base buffer, which would cleanup anyway. It seems we can simply don't release readBuffer in ShuffleReadClientImpl. I tested my application with that and seems run successfully and didn't see memory issue as well.
I also hit the same issue. In my case, readBuffer is an instance of DirectByteBuffer and readBuffer#cleaner is null and readBuffer#attachment is not null. Since readBuffer#cleaner is null, PlatformDependent.freeDirectBuffer which eventually invokes cleaner won't do anything regardless, plus before reaching cleaner, it's already error out when checking attachment. The readBuffer we get is from DirectByteBuffer.duplicate, which sets cleaner to null intentionally. It seems readBuffer is just a pointer to the base buffer, which would cleanup anyway. It seems we can simply don't release readBuffer in ShuffleReadClientImpl. I tested my application with that and seems run successfully and didn't see memory issue as well.
I think it's ok to remove the logic of release read buffer.
Do you try to use -Dio.netty.leakDetectionLevel= to ensure that the client don't have memory issues?