spark-rabbitmq
spark-rabbitmq copied to clipboard
spark-rabbitmq-0.5.1.jar run in Spark2.2.0 is ok,but not work in Spark2.3.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.initializeLogIfNecessary(Consumer.scala:176) at org.apache.spark.internal.Logging$class.log(Logging.scala:46) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.log(Consumer.scala:176) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$setUserPassword$1.apply(Consumer.scala:240) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$setUserPassword$1.apply(Consumer.scala:238) at scala.Option.foreach(Option.scala:257) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.setUserPassword(Consumer.scala:238) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:205) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$3.apply(RabbitMQInputDStream.scala:60) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$3.apply(RabbitMQInputDStream.scala:59) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.onStart(RabbitMQInputDStream.scala:59) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1609) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1597) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1596) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1596) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1830) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1779) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1768) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.initializeLogIfNecessary(Consumer.scala:176) at org.apache.spark.internal.Logging$class.log(Logging.scala:46) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.log(Consumer.scala:176) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$setUserPassword$1.apply(Consumer.scala:240) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$setUserPassword$1.apply(Consumer.scala:238) at scala.Option.foreach(Option.scala:257) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.setUserPassword(Consumer.scala:238) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:205) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$3.apply(RabbitMQInputDStream.scala:60) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$3.apply(RabbitMQInputDStream.scala:59) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.onStart(RabbitMQInputDStream.scala:59) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 18/11/16 18:25:59 INFO scheduler.ReceiverTracker: Restarting Receiver 0
Same problem in Spark 2.4
Any update on this?
I also faced this issue. However I managed to get it working by:
- changing spark.version property in pom.xml from 2.2.0 to 2.3.0 (this property is used to download appropriate spark dependencies)
- rebuilding the artifact locally
- using my locally built artifact in my project
thanks @matuszabojnik for the advice. I checkouted 0.5.1 tag and than replaced spark.version from 2.0.2 to 2.3.0. I see that current 0.6.0-SNAPSHOT is only supporting spark.version 2.2.0. Any chances to support spark.version 2.3.0 ? Thanks
Our team build it successfully for spark 2.4.3. Just make proper changes before building jar
- Specify
<spark.version>to your target spark version in pom.xml e.g2.4.3. - Change
<packaging>frompomtojarin pom.xml. - build package via
mvn package - Find your jar in
targetfolder
@matuszabojnik Thanks for the advice.
Our team build it successfully for spark 3.1.1 and scala 2.12 with following changes:
- Checkout
branch-0.5 - Change
<packaging>frompomtojarin pom.xml - Updated following versions
<scala.version>2.12.13</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.1</spark.version>
<akka.version>2.4.12</akka.version>
<scalatest.version>3.0.0</scalatest.version>
<scala.check.version>1.11.6</scala.check.version>
- Update artifactId
from
<artifactId>amqp-client_${scala.binary.version}</artifactId>to<artifactId>amqp-client_2.11</artifactId>(because amqp-client is not available for scala 2.12) - Update
src/main/scala/org/apache/spark/streaming/rabbitmq/distributed/RabbitMQRDD.scalato fix compile error. Modify line #153 fromcontext.addTaskCompletionListener(context => {tocontext.addTaskCompletionListener[Unit](context => { - build jar via
mvn package