spark-rabbitmq
spark-rabbitmq copied to clipboard
java.lang.OutOfMemoryError: GC overhead limit exceeded
I use: spark1.6.0 spark-rabbitmq 0.4.0
My program ran for 24 hours before it was reported wrong.Please help me.
My error log: `17/09/13 17:15:43 WARN spark.HeartbeatReceiver: Removing executor 6 with no recent heartbeats: 178850 ms exceeds timeout 120000 ms 17/09/13 17:15:43 ERROR cluster.YarnScheduler: Lost executor 6 on spark1: Executor heartbeat timed out after 178850 ms 17/09/13 17:15:43 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 7.0 (TID 75, spark1): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 178850 ms 17/09/13 17:15:43 INFO scheduler.DAGScheduler: Executor lost: 6 (epoch 544) 17/09/13 17:15:43 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 6 from BlockManagerMaster. 17/09/13 17:15:45 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 6 17/09/13 17:15:52 WARN server.TransportChannelHandler: Exception in connection from spark2/192.168.155.3:50701 java.lang.OutOfMemoryError: GC overhead limit exceeded 17/09/13 17:15:52 WARN server.TransportChannelHandler: Exception in connection from spark3/192.168.155.4:37321 java.lang.OutOfMemoryError: GC overhead limit exceeded 17/09/13 17:15:52 WARN server.TransportChannelHandler: Exception in connection from spark3/192.168.155.4:37317 java.lang.OutOfMemoryError: GC overhead limit exceeded 17/09/13 17:15:52 WARN server.TransportChannelHandler: Exception in connection from spark2/192.168.155.3:57252 java.lang.OutOfMemoryError: GC overhead limit exceeded 17/09/13 17:15:52 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(6, spark1, 54732) 17/09/13 17:15:52 INFO storage.BlockManagerMaster: Removed 6 successfully in removeExecutor 17/09/13 17:15:52 INFO storage.BlockManagerInfo: Added input-2-1505294133600 in memory on spark2:53361 (size: 9.1 KB, free: 1047.9 MB) 17/09/13 17:15:52 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=6104341188587013298, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=353 cap=353]}} to spark3/192.168.155.4:37317; closing connection java.nio.channels.ClosedChannelException 17/09/13 17:15:52 INFO storage.BlockManagerInfo: Added input-3-1505294135000 in memory on spark3:34916 (size: 1568.0 B, free: 819.1 MB) 17/09/13 17:15:52 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7579983330511639701, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=47]}} to spark2/192.168.155.3:57252; closing connection java.nio.channels.ClosedChannelException 17/09/13 17:15:52 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=5632111837789001813, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=47]}} to spark3/192.168.155.4:37321; closing connection java.nio.channels.ClosedChannelException 17/09/13 17:15:56 WARN server.TransportChannelHandler: Exception in connection from spark1/192.168.155.2:41920 java.lang.OutOfMemoryError: GC overhead limit exceeded 17/09/13 17:15:56 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED! 17/09/13 17:15:56 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.155.2:4040 17/09/13 17:16:00 WARN server.TransportChannelHandler: Exception in connection from spark2/192.168.155.3:57212 java.lang.OutOfMemoryError: GC overhead limit exceeded 17/09/13 17:16:00 ERROR scheduler.ReceiverTracker: Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job 4 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1751) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230) at org.apache.spark.SparkContext.stop(SparkContext.scala:1750) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147) 17/09/13 17:16:00 INFO scheduler.DAGScheduler: ResultStage 1508 (start at SparkStremingReadRabbitMQ.scala:77) failed in 6207.798 s 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@1ab59f28) 17/09/13 17:16:00 INFO scheduler.DAGScheduler: ResultStage 5 (start at SparkStremingReadRabbitMQ.scala:77) failed in 83400.231 s 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@772c8704) 17/09/13 17:16:00 INFO scheduler.ReceiverTracker: Restarting Receiver 3 17/09/13 17:16:00 INFO scheduler.DAGScheduler: ResultStage 2 (start at SparkStremingReadRabbitMQ.scala:77) failed in 83400.642 s 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@4b86d4b2) 17/09/13 17:16:00 INFO scheduler.DAGScheduler: ShuffleMapStage 1599 (text at SparkStremingReadRabbitMQ.scala:72) failed in 1592.898 s 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@1ae3ac4) 17/09/13 17:16:00 INFO scheduler.DAGScheduler: ResultStage 6 (start at SparkStremingReadRabbitMQ.scala:77) failed in 83400.065 s 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@37d4a906) 17/09/13 17:16:00 INFO scheduler.DAGScheduler: ResultStage 7 (start at SparkStremingReadRabbitMQ.scala:77) failed in 83399.925 s 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@24531c5f) 17/09/13 17:16:00 INFO scheduler.DAGScheduler: ResultStage 4 (start at SparkStremingReadRabbitMQ.scala:77) failed in 83400.364 s 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@590d6f23) 17/09/13 17:16:00 INFO scheduler.DAGScheduler: ResultStage 1601 (start at SparkStremingReadRabbitMQ.scala:77) failed in 698.533 s 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@290516dc) 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(4,1505294160255,JobFailed(org.apache.spark.SparkException: Job 4 cancelled because SparkContext was shut down)) 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(6,1505294160255,JobFailed(org.apache.spark.SparkException: Job 6 cancelled because SparkContext was shut down)) 17/09/13 17:16:00 ERROR scheduler.ReceiverTracker: Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job 6 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1751) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230) at org.apache.spark.SparkContext.stop(SparkContext.scala:1750) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147) 17/09/13 17:16:00 INFO scheduler.ReceiverTracker: Restarting Receiver 5 17/09/13 17:16:00 ERROR scheduler.ReceiverTracker: Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job 995 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1751) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230) at org.apache.spark.SparkContext.stop(SparkContext.scala:1750) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147) 17/09/13 17:16:00 INFO scheduler.ReceiverTracker: Restarting Receiver 6 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(995,1505294160257,JobFailed(org.apache.spark.SparkException: Job 995 cancelled because SparkContext was shut down)) 17/09/13 17:16:00 ERROR netty.Inbox: Ignoring error java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. This stopped SparkContext was created at:
org.apache.spark.SparkContext.
The currently active SparkContext was created at:
org.apache.spark.SparkContext.
at org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:107)
at org.apache.spark.SparkContext$$anonfun$makeRDD$2.apply(SparkContext.scala:830)
at org.apache.spark.SparkContext$$anonfun$makeRDD$2.apply(SparkContext.scala:829)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:725)
at org.apache.spark.SparkContext.makeRDD(SparkContext.scala:829)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.org$apache$spark$streaming$scheduler$ReceiverTracker$ReceiverTrackerEndpoint$$startReceiver(ReceiverTracker.scala:588)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receive$1.applyOrElse(ReceiverTracker.scala:477)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/09/13 17:16:00 ERROR scheduler.ReceiverTracker: Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job 3 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1751) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230) at org.apache.spark.SparkContext.stop(SparkContext.scala:1750) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147) 17/09/13 17:16:00 INFO scheduler.ReceiverTracker: Restarting Receiver 2 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(3,1505294160258,JobFailed(org.apache.spark.SparkException: Job 3 cancelled because SparkContext was shut down)) 17/09/13 17:16:00 ERROR scheduler.ReceiverTracker: Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1751) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230) at org.apache.spark.SparkContext.stop(SparkContext.scala:1750) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147) 17/09/13 17:16:00 INFO scheduler.ReceiverTracker: Restarting Receiver 4 17/09/13 17:16:00 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(5,1505294160261,JobFailed(org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down)) 17/09/13 17:16:00 ERROR netty.Inbox: Ignoring error java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. This stopped SparkContext was created at:
org.apache.spark.SparkContext.
The currently active SparkContext was created at:
org.apache.spark.SparkContext.
at org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:107)
at org.apache.spark.SparkContext$$anonfun$makeRDD$2.apply(SparkContext.scala:830)
at org.apache.spark.SparkContext$$anonfun$makeRDD$2.apply(SparkContext.scala:829)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:725)
at org.apache.spark.SparkContext.makeRDD(SparkContext.scala:829)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.org$apache$spark$streaming$scheduler$ReceiverTracker$ReceiverTrackerEndpoint$$startReceiver(ReceiverTracker.scala:588)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receive$1.applyOrElse(ReceiverTracker.scala:477)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/09/13 17:16:00 ERROR netty.Inbox: Ignoring error java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. This stopped SparkContext was created at:
org.apache.spark.SparkContext.
The currently active SparkContext was created at:
org.apache.spark.SparkContext.
at org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:107)
at org.apache.spark.SparkContext$$anonfun$makeRDD$2.apply(SparkContext.scala:830)
at org.apache.spark.SparkContext$$anonfun$makeRDD$2.apply(SparkContext.scala:829)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:725)
at org.apache.spark.SparkContext.makeRDD(SparkContext.scala:829)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.org$apache$spark$streaming$scheduler$ReceiverTracker$ReceiverTrackerEndpoint$$startReceiver(ReceiverTracker.scala:588)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receive$1.applyOrElse(ReceiverTracker.scala:477)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/09/13 17:16:00 ERROR netty.Inbox: Ignoring error java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. This stopped SparkContext was created at:
org.apache.spark.SparkContext.
The currently active SparkContext was created at:
org.apache.spark.SparkContext.
at org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:107)
at org.apache.spark.SparkContext$$anonfun$makeRDD$2.apply(SparkContext.scala:830)
at org.apache.spark.SparkContext$$anonfun$makeRDD$2.apply(SparkContext.scala:829)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:725)
at org.apache.spark.SparkContext.makeRDD(SparkContext.scala:829)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.org$apache$spark$streaming$scheduler$ReceiverTracker$ReceiverTrackerEndpoint$$startReceiver(ReceiverTracker.scala:588)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receive$1.applyOrElse(ReceiverTracker.scala:477)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/09/13 17:16:00 INFO scheduler.DAGScheduler: Job 1055 failed: text at SparkStremingReadRabbitMQ.scala:72, took 1597.324995 s 17/09/13 17:16:00 ERROR netty.Inbox: Ignoring error java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. This stopped SparkContext was created at:
org.apache.spark.SparkContext.
Try to increase the parallelism (more partitions)
@nelsou Thank you for your help.I try to increase the parallelism,but it is not effect. My spark code is : `val conf = new SparkConf().setAppName("SparkStremingReadRabbitMQ") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext(conf, Seconds(60*10)) import org.apache.spark.sql.functions._
val mapdi=getconfig("di")
val direceiverStream = RabbitMQUtils.createStream[String](ssc, mapdi)
direceiverStream.print()
//direceiverStream.saveAsTextFiles("hdfs://master:8020/test/di/di", "txt")
val mappowermeter=getconfig("powermeter")
val mappowermeterdireceiverStream = RabbitMQUtils.createStream[String](ssc, mappowermeter)
mappowermeterdireceiverStream.print()
val mapai=getconfig("ai")
val aidireceiverStream = RabbitMQUtils.createStream[String](ssc, mapai)
aidireceiverStream.print()
val mappmbuspower_ac=getconfig("pmbuspower_ac")
val pmbuspower_acreceiverStream = RabbitMQUtils.createStream[String](ssc, mappmbuspower_ac)
pmbuspower_acreceiverStream.foreachRDD { rdd =>
val sqlContext=SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val data=rdd.toDF("jsonData")
data.repartition(1).write.mode(SaveMode.Append).text("hdfs://master:8020/test/pmbuspower_ac_"+getday+".txt")
}
val mappmbuspower_dc=getconfig("pmbuspower_dc")
val pmbuspower_dcreceiverStream = RabbitMQUtils.createStream[String](ssc, mappmbuspower_dc)
pmbuspower_dcreceiverStream.foreachRDD { rdd =>
val sqlContext=SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val data=rdd.toDF("jsonData")
data.repartition(1).write.mode(SaveMode.Append).text("hdfs://master:8020/test/pmbuspower_dc_"+getday+".txt")
}
val mappmbuspower_rc=getconfig("pmbuspower_rc")
val pmbuspower_rcreceiverStream = RabbitMQUtils.createStream[String](ssc, mappmbuspower_rc)
pmbuspower_rcreceiverStream.foreachRDD { rdd =>
val sqlContext=SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val data=rdd.toDF("jsonData")
data.repartition(1).write.mode(SaveMode.Append).text("hdfs://master:8020/test/pmbuspower_rc_"+getday+".txt")
}
val mapbattery=getconfig("battery")
val batteryreceiverStream = RabbitMQUtils.createStream[String](ssc, mapbattery)
batteryreceiverStream.foreachRDD { rdd =>
val sqlContext=SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val data=rdd.toDF("jsonData")
data.repartition(1).write.mode(SaveMode.Append).text("hdfs://master:8020/test/battery_"+getday+".txt")
}
ssc.start()
ssc.awaitTermination()`
I set --num-executors 8 --executor-memory 2g
It is still report error after the spark streaming ran for 24 hours(maybe less). The important logs is: `17/09/13 15:31:53 WARN server.TransportChannelHandler: Exception in connection from spark2/192.168.155.3:50700 java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Method.copy(Method.java:153) at java.lang.reflect.ReflectAccess.copyMethod(ReflectAccess.java:140) at sun.reflect.ReflectionFactory.copyMethod(ReflectionFactory.java:302) at java.lang.Class.copyMethods(Class.java:3124) at java.lang.Class.getDeclaredMethods(Class.java:1975) at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1737) at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:253) at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:251) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:250) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:611) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109) at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:261) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:313) 17/09/13 15:31:55 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7074671327750330138, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=47]}} to spark2/192.168.155.3:50700; closing connection java.nio.channels.ClosedChannelException 17/09/13 15:31:58 INFO scheduler.TaskSetManager: Starting task 1629.0 in stage 1506.0 (TID 1067854, spark3, partition 1629,RACK_LOCAL, 2777 bytes) 17/09/13 15:32:01 INFO storage.BlockManagerInfo: Added input-3-1505287766800 in memory on spark2:53213 (size: 128.4 KB, free: 964.7 MB) 17/09/13 15:32:01 INFO scheduler.TaskSetManager: Finished task 1628.0 in stage 1506.0 (TID 1067853) in 31678 ms on spark3 (1872/2200) 17/09/13 15:32:01 INFO storage.BlockManagerInfo: Added input-6-1505287765200 in memory on spark3:34916 (size: 273.2 KB, free: 926.1 MB) 17/09/13 15:32:01 INFO storage.BlockManagerInfo: Added input-2-1505287765800 in memory on spark1:54732 (size: 246.2 KB, free: 954.4 MB) 17/09/13 15:32:01 INFO storage.BlockManagerInfo: Added input-5-1505287767200 in memory on spark1:54732 (size: 179.4 KB, free: 954.3 MB) 17/09/13 15:32:01 INFO cluster.YarnClientSchedulerBackend: Disabling executor 1. 17/09/13 15:32:03 INFO scheduler.DAGScheduler: Executor lost: 1 (epoch 512) 17/09/13 15:32:03 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 17/09/13 15:32:12 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_1505121941097_0011_01_000002 on host: spark2. Exit status: 1. Diagnostics: Exception from container-launch. Container id: container_1505121941097_0011_01_000002 Exit code: 1 Stack trace: ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:601) at org.apache.hadoop.util.Shell.run(Shell.java:504) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:786) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 1
17/09/13 15:32:12 WARN servlet.ServletHandler: Error for /storage/ java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Integer.toString(Integer.java:401) at java.lang.Integer.toString(Integer.java:935) at java.lang.String.valueOf(String.java:2994) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) at org.apache.spark.storage.StreamBlockId.name(BlockId.scala:85) at org.apache.spark.storage.BlockId.toString(BlockId.scala:43) at org.apache.spark.ui.storage.StoragePage.org$apache$spark$ui$storage$StoragePage$$streamBlockTableSubrow(StoragePage.scala:177) at org.apache.spark.ui.storage.StoragePage.org$apache$spark$ui$storage$StoragePage$$streamBlockTableRow(StoragePage.scala:164) at org.apache.spark.ui.storage.StoragePage$$anonfun$7.apply(StoragePage.scala:142) at org.apache.spark.ui.storage.StoragePage$$anonfun$7.apply(StoragePage.scala:142) at org.apache.spark.ui.UIUtils$$anonfun$listingTable$2.apply(UIUtils.scala:311) at org.apache.spark.ui.UIUtils$$anonfun$listingTable$2.apply(UIUtils.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.ui.UIUtils$.listingTable(UIUtils.scala:311) at org.apache.spark.ui.storage.StoragePage.streamBlockTable(StoragePage.scala:140) at org.apache.spark.ui.storage.StoragePage.receiverBlockTables(StoragePage.scala:90) at org.apache.spark.ui.storage.StoragePage.render(StoragePage.scala:34) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:80) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:80) at org.apache.spark.ui.JettyUtils$$anon$2.doGet(JettyUtils.scala:82) at javax.servlet.http.HttpServlet.service(HttpServlet.java:735) at javax.servlet.http.HttpServlet.service(HttpServlet.java:848) at org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.spark-project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1496) at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:162) at org.spark-project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1467) at org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:499) 17/09/13 15:32:12 ERROR cluster.YarnScheduler: Lost executor 1 on spark2: Container marked as failed: container_1505121941097_0011_01_000002 on host: spark2. Exit status: 1. Diagnostics: Exception from container-launch. Container id: container_1505121941097_0011_01_000002 Exit code: 1 Stack trace: ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:601) at org.apache.hadoop.util.Shell.run(Shell.java:504) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:786) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 1
17/09/13 15:32:12 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, spark2, 38456) 17/09/13 15:32:12 INFO storage.BlockManagerInfo: Added input-0-1505287765200 in memory on spark3:40237 (size: 372.8 KB, free: 928.1 MB) 17/09/13 15:32:12 INFO storage.BlockManagerInfo: Added input-3-1505287766800 in memory on spark1:54732 (size: 128.4 KB, free: 954.1 MB) 17/09/13 15:32:12 INFO storage.BlockManagerInfo: Added input-5-1505287767200 in memory on spark3:40237 (size: 179.4 KB, free: 927.9 MB) 17/09/13 15:32:12 INFO storage.BlockManagerInfo: Added input-1-1505287765600 in memory on spark2:33626 (size: 764.4 KB, free: 726.8 MB) 17/09/13 15:32:12 INFO storage.BlockManagerMaster: Removed 1 successfully in removeExecutor 17/09/13 15:32:12 INFO storage.BlockManagerInfo: Added input-4-1505287767400 in memory on spark2:33626 (size: 85.4 KB, free: 726.7 MB) 17/09/13 15:32:12 INFO storage.BlockManagerInfo: Added input-5-1505287767400 in memory on spark1:54732 (size: 148.6 KB, free: 954.0 MB) 17/09/13 15:32:12 INFO storage.BlockManagerInfo: Added input-2-1505287766000 in memory on spark3:34916 (size: 294.8 KB, free: 925.8 MB) 17/09/13 15:32:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 8.0 (TID 76, spark2): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container marked as failed: container_1505121941097_0011_01_000002 on host: spark2. Exit status: 1. Diagnostics: Exception from container-launch. Container id: container_1505121941097_0011_01_000002 Exit code: 1 Stack trace: ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:601) at org.apache.hadoop.util.Shell.run(Shell.java:504) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:786) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 1`
Any help would be appreciated.
Because of "data.repartition(1)" Your are removing all the partitions ... You should avoid such things if you don't control the size of the data you compute.
@nelsou Thank you for your help.But I can control the size of the data.
You can look the picture.The speed is stable.
@nelsou Any suggestions ?
The repartition(1) is useless. Try removing it.
@nelsou Spark will generate many files If I try removing it.I would like to generate one file every time.
After the repartition(1) you are doing ".mode(SaveMode.Append)". It doesn't work ?
@nelsou It is work.But you can look this picture. Spark streming generate many files every bacth if I try removing repartition(1).I just let it generate one file every bacth.
In my opinion it is not the role of Spark Streaming to control the number of output.
There is some tricky solutions (saveAsHadoopFile is one) but you should probably avoid such thing.
@nelsou I just want to save text file . And it is better generate one file every spark streaming bacth.Do you have some examples?