incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[Bug] spark executor task error when reading shuffle data when using java open jdk11

Open ChenRussell opened this issue 1 year ago • 10 comments

Code of Conduct

Search before asking

  • [X] I have searched in the issues and found no similar issues.

Describe the bug

I use openjdk 11 in spark image, and I get errors when spark task reading shuffle data from uniffle server, here is the executor task error log: image

Affects Version(s)

0.9.0

Uniffle Server Log Output

no error message

Uniffle Engine Log Output

java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
    at org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
    at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
    at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:314)
    at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:216)
    at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:307)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator.isEmpty(Iterator.scala:387)
    at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
    at scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
    at scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
    at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
    at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1559)
    at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1558)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    Suppressed: org.apache.spark.util.TaskCompletionListenerException: null

Previous exception in task: null
    java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
    java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    java.base/java.lang.reflect.Method.invoke(Unknown Source)
    org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
    org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
    org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
    org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:314)
    org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:216)
    org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
    org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:307)
    org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithKeys_0$(Unknown Source)
    org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    scala.collection.Iterator.isEmpty(Iterator.scala:387)
    scala.collection.Iterator.isEmpty$(Iterator.scala:387)
    scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
    scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
    scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
    scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
    org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1559)
    org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1558)
    org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
    org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    org.apache.spark.scheduler.Task.run(Task.scala:141)
    org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    java.base/java.lang.Thread.run(Unknown Source)
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
        at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:177)
        ... 9 more
        Suppressed: java.lang.reflect.InvocationTargetException
            at java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
            at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
            at java.base/java.lang.reflect.Method.invoke(Unknown Source)
            at org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
            at org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
            at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
            at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
            at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:217)
            at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:284)
            at scala.Function0.apply$mcV$sp(Function0.scala:39)
            at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
            at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$1(RssShuffleReader.java:296)
            at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
            at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
            at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
            ... 12 more
        Caused by: java.lang.IllegalArgumentException: duplicate or slice
            at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unknown Source)
            ... 27 more
Caused by: java.lang.IllegalArgumentException: duplicate or slice
    at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unknown Source)
    ... 42 more

Uniffle Server Configurations

rss.rpc.server.type GRPC_NETTY
...

Uniffle Engine Configurations

spark.rss.client.type=GRPC_NETTY
spark.rss.client.netty.io.mode=EPOLL
spark.rss.storage.type=MEMORY_LOCALFILE
...

Additional context

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

ChenRussell avatar Aug 22 '24 12:08 ChenRussell

@rickyma @jerqi Does the client side support jdk11?

ChenRussell avatar Sep 04 '24 12:09 ChenRussell

@rickyma @jerqi Does the client side support jdk11?

We didn't test it. From the error, it seems that we don't use proper method to release memory for JDK11.

jerqi avatar Sep 05 '24 06:09 jerqi

@rickyma @jerqi Does the client side support jdk11?

We didn't test it. From the error, it seems that we don't use proper method to release memory for JDK11.

How should I fix this error for JDK11?

ChenRussell avatar Sep 05 '24 07:09 ChenRussell

@rickyma Do you have any suggestion?

jerqi avatar Sep 05 '24 09:09 jerqi

I've no idea. Could you please test this case using JDK 11? @maobaolong

rickyma avatar Sep 05 '24 16:09 rickyma

@rickyma We use jdk8 for all RSS cluster and client, so we did not encounter this issue for production env.

But I did a test on JDK11 just konw, this issue reproduced.

➜  spark.mbl bin/spark-shell  --master  spark://localhost:7077  --deploy-mode client  --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager --conf spark.rss.coordinator.quorum=localhost:19999  --conf spark.rss.storage.type=LOCALFILE --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.rss.test.mode.enable=true --conf spark.rss.client.type=GRPC_NETTY --conf spark.sql.shuffle.partitions=1  -i test.scala  
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/softwares/spark.mbl/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/softwares/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
24/09/06 11:41:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/06 11:41:46 WARN RssSparkShuffleUtils: Empty conf items
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20240906114147-0002).
Spark session available as 'spark'.
24/09/06 11:41:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2) (localhost executor 0): java.lang.reflect.InvocationTargetException
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
        at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
        at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
        at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
        at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
        at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
        at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
        at scala.Function0.apply$mcV$sp(Function0.scala:39)
        at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
        at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
        at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
        at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
        at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
        ... 29 more

24/09/06 11:41:58 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8) (9.134.219.180 executor 0): java.lang.reflect.InvocationTargetException
        at java.base/jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
        at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
        at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
        at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
        at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
        at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
        at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
        at scala.Function0.apply$mcV$sp(Function0.scala:39)
        at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
        at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
        at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
        at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
        at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
        ... 28 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2687)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2623)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2622)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2622)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2880)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2822)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2811)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2349)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2370)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2389)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
  ... 65 elided
Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumentException: duplicate or slice
  at java.base/jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
  at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
  at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
  at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
  at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
  at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
  at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
  at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
  at scala.Function0.apply$mcV$sp(Function0.scala:39)
  at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
  at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
  at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
  at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
  at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
  at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
  at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
  ... 28 more
  • test.scala
val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("A", 6), ("A", 7),("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7)));
val result = data.reduceByKey(_ + _);
result.collect().foreach(println);
System.exit(0);

maobaolong avatar Sep 06 '24 03:09 maobaolong

Could you help to solve this? It will block people from using JDK 11. @maobaolong

rickyma avatar Sep 06 '24 04:09 rickyma

@rickyma @ChenRussell JDK11 cannot works with uniffle client for now.

maobaolong avatar Sep 11 '24 05:09 maobaolong

@rickyma @ChenRussell JDK8 cannot works with uniffle client for now.

Do you mean JDK11?

ChenRussell avatar Sep 11 '24 06:09 ChenRussell

Do you mean JDK11?

Sorry for the mistake, yeah, i mean JDK11, I got this conclusion from the community meeting.

maobaolong avatar Sep 11 '24 08:09 maobaolong

@advancedxy @LuciferYang Do you have any suggestion about this issue?

jerqi avatar Nov 07 '24 02:11 jerqi

Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.

advancedxy avatar Nov 07 '24 11:11 advancedxy

I've been running into the same issue. After searching around online I found this. And while it seems like a dirty hack, that project is wide-spread enough to hopefully catch this sort of thing. I replaced RssUtils.java:426 with the following, and it works:

  PlatformDependent.freeDirectBuffer(byteBuffer);
} catch (Throwable e) {
  Throwable cause = e.getCause();

  if (!(cause instanceof IllegalArgumentException)) {
    throw e;
  }
  // Buffer is duplicate or slice
}

Not sure how to check for memory leaks right now though.

liorregev avatar Nov 07 '24 12:11 liorregev

@ChenRussell Have you tried --add-opens java.base/jdk.internal.ref=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true

LuciferYang avatar Nov 07 '24 16:11 LuciferYang

@ChenRussell Have you tried --add-opens java.base/jdk.internal.ref=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true

Yes, I have tried, that's not working, it's not the accessible problem, it can access the reflection method

ChenRussell avatar Nov 08 '24 02:11 ChenRussell

@ChenRussell Got it ~ Is there an existing unit test (UT) that can reproduce this issue? Java 8 uses CleanerJava6, while Java 9+ uses CleanerJava9, and there are some differences in the code implementation. It would be easier to identify the root cause if there is a UT that can reproduce it.

LuciferYang avatar Nov 08 '24 02:11 LuciferYang

@ChenRussell Got it ~ Is there an existing unit test (UT) that can reproduce this issue? Java 8 uses CleanerJava6, while Java 9+ uses CleanerJava9, and there are some differences in the code implementation. It would be easier to identify the root cause if there is a UT that can reproduce it.

You can use this https://github.com/apache/incubator-uniffle/issues/2082#issuecomment-2333178521 to reproduce this issue.

ChenRussell avatar Nov 08 '24 02:11 ChenRussell

Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.

agree. The root cause is that ByteBuffer#attachment is unexpectedly null.

xumanbu avatar Dec 09 '24 09:12 xumanbu

Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.

agree. The root cause is that ByteBuffer#attachment is unexpectedly null.

Can we assess if ByteBuffer#attachment is non-null before releasing memory as a temporary fix?

xumanbu avatar Dec 09 '24 09:12 xumanbu

Similar problems arise here, such as ARTEMIS-2131 2378 WFLY-11026 , We can refer to it

dingshun3016 avatar Dec 10 '24 03:12 dingshun3016

It seems to be caused by this code, a result byteBuf will be a silced ByteBuf, but a silced Buffer byteBuffer = byteBuf.nioBuffer() not support release. @advancedxy @ChenRussell https://github.com/apache/incubator-uniffle/blob/b7c9ca59b1413fa480fa0cfec666e9d15d77dcf9/common/src/main/java/org/apache/uniffle/common/netty/TransportFrameDecoder.java#L179

xumanbu avatar Dec 11 '24 10:12 xumanbu

Maybe we could remove

    if (readBuffer != null) {
      RssUtils.releaseByteBuffer(readBuffer);
    }

Because we have released the ByteBuf

    if (sdr != null) {
      sdr.release();
      // We set sdr to null here to prevent IllegalReferenceCountException that could occur
      // if sdr.release() is called multiple times in the close() method,
      // when an exception is thrown by clientReadHandler.readShuffleData().
      sdr = null;
    }

image

cc @rickyma @maobaolong @leixm WDYT?

jerqi avatar Dec 13 '24 07:12 jerqi

I also hit the same issue. In my case, readBuffer is an instance of DirectByteBuffer and readBuffer#cleaner is null and readBuffer#attachment is not null. Since readBuffer#cleaner is null, PlatformDependent.freeDirectBuffer which eventually invokes cleaner won't do anything regardless, plus before reaching cleaner, it's already error out when checking attachment. The readBuffer we get is from DirectByteBuffer.duplicate, which sets cleaner to null intentionally. It seems readBuffer is just a pointer to the base buffer, which would cleanup anyway. It seems we can simply don't release readBuffer in ShuffleReadClientImpl. I tested my application with that and seems run successfully and didn't see memory issue as well.

daijy avatar Feb 11 '25 18:02 daijy

I also hit the same issue. In my case, readBuffer is an instance of DirectByteBuffer and readBuffer#cleaner is null and readBuffer#attachment is not null. Since readBuffer#cleaner is null, PlatformDependent.freeDirectBuffer which eventually invokes cleaner won't do anything regardless, plus before reaching cleaner, it's already error out when checking attachment. The readBuffer we get is from DirectByteBuffer.duplicate, which sets cleaner to null intentionally. It seems readBuffer is just a pointer to the base buffer, which would cleanup anyway. It seems we can simply don't release readBuffer in ShuffleReadClientImpl. I tested my application with that and seems run successfully and didn't see memory issue as well.

I think it's ok to remove the logic of release read buffer.

Do you try to use -Dio.netty.leakDetectionLevel= to ensure that the client don't have memory issues?

jerqi avatar Feb 13 '25 07:02 jerqi