ops
ops copied to clipboard
Issues with Apache Spark Worker
Hi!
I am working on trying to set up an Apache Spark Master/Worker-cluster (locally), but I am having some issues with the Worker-node. It throws the following error message when I try to submit the SparkPi example code:
20/12/22 15:51:16 ERROR ExecutorRunner: Error running executor
java.io.IOException: Cannot run program "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java" (in directory "./work/app-20201222155116-0013/0"): error=38, Function not implemented
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at org.apache.spark.deploy.worker.ExecutorRunner.org$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:181)
at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:79)
Caused by: java.io.IOException: error=38, Function not implemented
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 2 more
To replicate my current application set up, follow these steps:
- Download the existing
spark_3.0.0
package with:ops pkg get spark_3.0.0
- Extract the downloaded
spark_3.0.0.tar.gz
file from~/.ops/packages/
to~/.ops/local_packages/
- Duplicate the extracted directory so that you have two versions of the
spark_3.0.0
directory within~/.ops/local_packages/
, one namedsparkmaster_3.0.0
and the second namedsparkworker_3.0.0
- Update the
package.manifest
file within thesparkworker_3.0.0
directory to:
{
"Program": "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java",
"Version": "3.0.0",
"Args": ["java", "-Dos.name=Linux", "-cp", "/conf/:/jars/*", "-Xmx1g", "org.apache.spark.deploy.worker.Worker", "spark://0.0.0.0:7077"],
"RunConfig": {
"Memory": "2G"
},
"Env": {
"SPARK_USER": "root",
"SPARK_SCALA_VERSION": "2.12",
"JAVA_HOME": "/usr/lib/jvm/java-8-openjdk-amd64/jre"
}
}
- Launch the Spark Master with:
ops load -l sparkmaster_3.0.0 -p 8080 -p 7077 -i master
- Launch the Spark Worker with:
ops load -l sparkworker_3.0.0 -i worker
- Start the SparkPi example (from within the
bin
directory of Apache Spark):./spark-submit --class org.apache.spark.examples.SparkPi --master spark://0.0.0.0:7077 --executor-memory 1G --total-executor-cores 1 ../examples/jars/spark-examples_2.12-3.0.0.jar 1000
I then did some further experimentations with the source code of the SparkPi example code (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala). Commenting out lines 34 - 38 made it possible to run the application without problems. The edited SparkPi example looks like this:
package com.testing
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("spark://0.0.0.0:7077")
.appName("Spark Pi")
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
/*val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)*/
println(s"Pi is roughly ${4.0 * n / (n - 1)}")
spark.stop()
}
}
However, this makes the application not do what it is designed to do, map
and reduce
.
It also seems that the "Cannot run program"
part of the error message is linked to the JAVA_HOME
environment variable. Do you have any recommendations or suggestions for how to make the Worker-node run without issues?
i'd need to dive into this more because I'm sure this can be configured differently but looking at the trace we explicitly don't support fork/exec
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
Hello @eyberg!
You are absolutely right that the issue must be related to the java.lang.UNIXProcess.forkAndExec
function, which is caused by the process of running a Spark Job that spawns multiple tasks that are ran in parallel in response to Spark actions (e.g. reduce
, collect
). This functionality goes against the definition of unikernels as stated: "Unikernels are specialised single process operating systems".
A possible solution could be to have dedicated Job/Task unikernels being spawned from the original process? However, not sure if this is a wanted feature since unikernels are designed for "single process". The best would maybe be to have it all built into the Spark Worker unikernel itself so that it is within its own environment.
Reference: https://spark.apache.org/docs/latest/cluster-overview.html
i used spark maybe 2.5-3yrs ago non-unikernlized and we ran multiple workers on the same instance - i think there are 2 options but i don't know the effort/work involved:
- make each worker spawn as a unikernel
- convert the underlying code to use threads instead of process
I took a look at the ExecutorRunner and it's pretty clearly process based but https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala