Ammonite icon indicating copy to clipboard operation
Ammonite copied to clipboard

Spark Execution hangs in ammonite script but not in ammonite repl

Open schlichtanders opened this issue 6 years ago • 4 comments

I adapted the ammonite spark example slightly https://github.com/lihaoyi/Ammonite/blob/717eac568f854da76ac4162ae451c4fa9da328a3/integration/src/test/resources/ammonite/integration/basic/Spark2.sc so to realize a spark recommender demo https://spark.apache.org/docs/2.2.1/ml-collaborative-filtering.html (the data can be found in the spark repo)

import $ivy.{
  `org.apache.spark::spark-core:2.2.1`,
  `org.apache.spark::spark-sql:2.2.1`
}
import ammonite.ops._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.master("local[*]").appName("bxm").getOrCreate
import spark.implicits._

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val df = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
println("-------------START---------------")
println(df.take(2))
println("-------------END---------------")

When executed on Ammonite-REPL everything works, however executed as Ammonite Script, it hangs right at df.take(2) with the following output from START up to then:

-------------START---------------
18/03/09 09:44:08 INFO FileSourceStrategy: Pruning directories with: 
18/03/09 09:44:08 INFO FileSourceStrategy: Post-Scan Filters: 
18/03/09 09:44:08 INFO FileSourceStrategy: Output Data Schema: struct<value: string>
18/03/09 09:44:08 INFO FileSourceScanExec: Pushed Filters: 
18/03/09 09:44:08 INFO CodeGenerator: Code generated in 180.348556 ms
18/03/09 09:44:09 INFO CodeGenerator: Code generated in 47.862283 ms
18/03/09 09:44:09 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 219.6 KB, free 119.8 MB)
18/03/09 09:44:09 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.6 KB, free 119.8 MB)
18/03/09 09:44:09 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.15:43493 (size: 20.6 KB, free: 120.0 MB)
18/03/09 09:44:09 INFO SparkContext: Created broadcast 0 from take at tmp3.sc:18
18/03/09 09:44:09 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
18/03/09 09:44:10 INFO SparkContext: Starting job: take at tmp3.sc:18
18/03/09 09:44:10 INFO DAGScheduler: Got job 0 (take at tmp3.sc:18) with 1 output partitions
18/03/09 09:44:10 INFO DAGScheduler: Final stage: ResultStage 0 (take at tmp3.sc:18)
18/03/09 09:44:10 INFO DAGScheduler: Parents of final stage: List()
18/03/09 09:44:10 INFO DAGScheduler: Missing parents: List()
18/03/09 09:44:10 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at take at tmp3.sc:18), which has no missing parents
18/03/09 09:44:10 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 11.7 KB, free 119.8 MB)
18/03/09 09:44:10 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.1 KB, free 119.7 MB)
18/03/09 09:44:10 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:43493 (size: 5.1 KB, free: 120.0 MB)
18/03/09 09:44:10 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
18/03/09 09:44:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at take at tmp3.sc:18) (first 15 tasks are for partitions Vector(0))
18/03/09 09:44:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/03/09 09:44:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5288 bytes)
18/03/09 09:44:10 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/03/09 09:44:10 INFO FileScanRDD: Reading File path: file:///home/mine/data/sample_movielens_ratings.txt, range: 0-32363, partition values: [empty row]
18/03/09 09:44:10 INFO CodeGenerator: Code generated in 7.488672 ms

and the additional error message stacktrace if I press CTRL-C to kill the hanging process

^C18/03/09 09:49:11 INFO SparkContext: Invoking stop() from shutdown hook
18/03/09 09:49:11 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4041
18/03/09 09:49:11 INFO DAGScheduler: ResultStage 0 (take at tmp3.sc:18) failed in 301.594 s due to Stage cancelled because SparkContext was shut down
18/03/09 09:49:11 INFO DAGScheduler: Job 0 failed: take at tmp3.sc:18, took 301.840087 s
org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
  org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:820)
  org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:818)
  scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
  org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:818)
  org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1750)
  org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
  org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1669)
  org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1928)
  org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)
  org.apache.spark.SparkContext.stop(SparkContext.scala:1927)
  org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
  org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
  org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
  org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
  org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
  org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1948)
  org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
  org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
  org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
  scala.util.Try$.apply(Try.scala:192)
  org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
  org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
  org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
  org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
  org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
  org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
  org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
  org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
  org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
  org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
  org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
  org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
  org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
  org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
  ammonite.$file.tmp3$.<init>(tmp3.sc:18)
  ammonite.$file.tmp3$.<clinit>(tmp3.sc)
18/03/09 09:49:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/03/09 09:49:11 INFO MemoryStore: MemoryStore cleared
18/03/09 09:49:11 INFO BlockManager: BlockManager stopped
18/03/09 09:49:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoClassDefFoundError: Could not initialize class ammonite.$file.tmp3$
	at ammonite.$file.tmp3$$anonfun$1.apply(tmp3.sc:15)
	at ammonite.$file.tmp3$$anonfun$1.apply(tmp3.sc:15)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
18/03/09 09:49:12 INFO Executor: Not reporting error to driver during JVM shutdown.
18/03/09 09:49:12 INFO BlockManagerMaster: BlockManagerMaster stopped
18/03/09 09:49:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/03/09 09:49:12 INFO SparkContext: Successfully stopped SparkContext
18/03/09 09:49:12 INFO ShutdownHookManager: Shutdown hook called
18/03/09 09:49:12 INFO ShutdownHookManager: Deleting directory /tmp/spark-8187d9ad-15b5-4d12-86d1-69b24a25fea6

versions: CentOS7 Ammonite Repl 1.0.5 (Scala 2.11.12 Java 1.8.0_161) same error for spark versions 2.1.0, 2.3.0 and probably also all the others...

schlichtanders avatar Mar 09 '18 08:03 schlichtanders

@schlichtanders Ammonite isn't really ready for spark yet. spark needs to be passed a bunch of classpath info from Ammonite to work propery. For that to work, Ammonite itself has to handle its classpath more carefully (not adding different versions of the same lib, etc.). I intend to submit a PR to do that soon…

alexarchambault avatar Mar 09 '18 09:03 alexarchambault

thanks for the information, at least now I know

schlichtanders avatar Mar 09 '18 16:03 schlichtanders

Any update on this...thx

eptx avatar May 10 '18 11:05 eptx

ammonite-spark is now out, and allows to run spark calculations from Ammonite.

alexarchambault avatar Jul 31 '18 09:07 alexarchambault