almond
almond copied to clipboard
Question: does almond support Spark 3.0?
Sorry for this newbie question. I searched around but don't see distinct statement on it.
almond uses ammonite-spark for Spark which supports Spark 3.0 in recent versions.
So if you use almond >= v0.10.1, Spark 3.0 should work. Let's keep this open until we've updated the docs accordingly.
Thanks for adding this. I was just about to come in here and post a problem, but maybe we should start with questions...
I was doing testing with spark 3.0.0 and almond-spark. My testing scenario is very unsophisticated - I pull in spark 3.0.0 along with a recent version of hadoop for azure ("org.apache.hadoop" % "hadoop-azure" % "3.2.1")
It was very easy to get "local mode" working (.master("local[*]")). I think I have everything working 100% - knock-on-wood.
However, I ran into numerous issues when connecting to a stand-alone cluster on my workstation. Some could be fixed as matter of tinkering with the sequence of the classpath (interp.load.ivy and spark.sparkContext.loadjar) Others took me many, many hours. And my approach to troubleshooting may be all wrong. Here are the two issues that I had spent the most time on.
-
Unable to use hadoop file system abstractions for abfss (finally fixed by loading hadoop 3.2.1 BEFORE spark sql 3.0.0)
-
Unable to serialize lambdas from my jupyter notebooks over to the cluster (This is still not yet resolved, and is a big showstopper since spark solutions rely heavily on lambdas)
Keep in mind that I'm new to everything - Java, Scala, Jupyter, and Spark. My problems start with trying to get a consistent set of modules loaded in the Jupyter kernel, so that they match the ones in the stand-alone spark cluster. This is an incredibly hard and unintuitive, and finicky process, especially if I compare the experience in Jupyter to the straightforward experience with sbt in Intellij. I think the difference is related to the way an interpreted environment works in Jupyter, as compared to the compile-time services offered by sbt to make sense of your modules and trim the cruft out of the classpath. To make a long story short, the Jupyter kernel environment pulls in a massive number of strange and unnecessary modules (and also wrong or duplicate versions of many modules) that seem to create interop problems when interacting with the stand-alone spark cluster.
At a bare minimum I would have expected to be able to serialize a simple lambda (.map(p => { p.getAs-String-("value") + "_func" })) and get a result back. But I get strange errors within the cluster about spark's inability to cast my lambda type to another similar one (RDD function or something).
Given that I'm new to this stuff, I suspect it would not be wise for me to be on the front lines, I will stick to "local mode" for now. But I'd rather interact with a stand-alone cluster at some point soon ... if someone can confirm that it is possible to get it working . And if anyone wants me to post the details of the problems I encountered, then I'm happy to do that. I think it should be a distinct issue since it will probably get fairly technical. I can provide a full repro if it helps.
I mentioned that "local mode" was working, but I should give a caveat. There are a large number of errors that are printed when creating a spark session via NotebookSparkSession.builder, and I'm ignoring them all. The errors don't seem to prevent a spark session from being created, and I've never noticed any related impact within spark environment.
Here is an example of the type of error that I see on Windows.
ERROR SparkContext: Failed to add file:/C:/Program%20Files/AdoptOpenJDK/jdk-8.0.252.09-hotspot/jre/lib/ext/access-bridge-64.jar to Spark environment java.io.FileNotFoundException: Jar C:\Program%20Files\AdoptOpenJDK\jdk-8.0.252.09-hotspot\jre\lib\ext\access-bridge-64.jar not found
I'm running almond 0.10.5 within jupyter lab, and the code for creating the "local mode" session is like so:
`val p_SparkSession = { NotebookSparkSession.builder()
.appName("APP_" + java.util.UUID.randomUUID().toString)
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:\\Temp\\Hive")
.config("spark.executor.instances", "4")
.config("spark.executor.memory", "4g")
.enableHiveSupport()
.getOrCreate()
} `
Hope this is clear. Let me know if not.
I mentioned that "local mode" was working, but I should give a caveat. There are a large number of errors that are printed when creating a spark session via NotebookSparkSession.builder, and I'm ignoring them all. The errors don't seem to prevent a spark session from being created, and I've never noticed any related impact within spark environment.
Here is an example of the type of error that I see on Windows.
ERROR SparkContext: Failed to add file:/C:/Program%20Files/AdoptOpenJDK/jdk-8.0.252.09-hotspot/jre/lib/ext/access-bridge-64.jar to Spark environment java.io.FileNotFoundException: Jar C:\Program%20Files\AdoptOpenJDK\jdk-8.0.252.09-hotspot\jre\lib\ext\access-bridge-64.jar not found
I'm running almond 0.10.5 within jupyter lab, and the code for creating the "local mode" session is like so:
`val p_SparkSession = { NotebookSparkSession.builder()
.appName("APP_" + java.util.UUID.randomUUID().toString) .master("local[*]") .config("spark.sql.warehouse.dir", "C:\\Temp\\Hive") .config("spark.executor.instances", "4") .config("spark.executor.memory", "4g") .enableHiveSupport() .getOrCreate()
} `
Hope this is clear. Let me know if not. It does not seem to be your problem. maybe you can try spark-shell,it should work
Also took me 10s of hours trying to tweak the Spark to get is using on my SageMaker Notebook. The standalone mode seemed to have a lot of issues, here is a summary I was facing testing for it:
-
Scala version mismatch (you have to make sure Almond (default 2.12.12) and Spark (default 2.12.10) using same Scala). Or you will have weird java.io issue saying the serilization mismatch.
-
https://github.com/almond-sh/almond/issues/695 my current issues. HTTP from FS not found.
@dbeavon since you got it working on standalone mode. Have you seen similar HTTP issue?
I have the following configuration:
applications:
scala: "2.12.11"
spark: "3.0.1"
hadoop: "3.2.1"
hadoop_major: "3.2"
jupyterlab: "2.2.9"
sbt: "1.4.5"
almond: "0.10.9"
In jupytyerlab the following error is produced:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 172.17.0.4, executor 0): UnknownReason
Driver stacktrace:
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
scala.Option.foreach(Option.scala:407)
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2194)
org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
org.apache.spark.rdd.DoubleRDDFunctions.$anonfun$sum$1(DoubleRDDFunctions.scala:35)
scala.runtime.java8.JFunction0$mcD$sp.apply(JFunction0$mcD$sp.java:23)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:35)
ammonite.$sess.cmd8$Helper.<init>(cmd8.sc:5)
ammonite.$sess.cmd8$.<init>(cmd8.sc:7)
based on the following code:
import $ivy.`org.apache.spark::spark-sql:3.0.1`
import org.apache.spark.sql._
val spark = {
NotebookSparkSession.builder()
.master("spark://ip-172-31-89-180.ec2.internal:7077")
.config("spark.executor.instances", "1")
.config("spark.executor.memory", "2g")
.getOrCreate()
}
def sc = spark.sparkContext
val rdd = sc.parallelize(1 to 100000000, 100)
val n = rdd.map(_ + 1).sum()
spark.stop()
Any ideas?