sparkling icon indicating copy to clipboard operation
sparkling copied to clipboard

Yay, it's cool. Now how do I use it with YARN?

Open robbieh opened this issue 5 years ago • 3 comments

So how do I use this with YARN? The best I can get out of it is "SparkException: Unable to load YARN support".

robbieh avatar Mar 03 '19 16:03 robbieh

Hi, sorry for taking so long, had some other business to run.

Not sure where's the problem, I was running it on yarn, too. So I know for sure it's possible. Sounds like either a missing library or a version clash to me. You should check your class path.

chrisbetz avatar Mar 06 '19 13:03 chrisbetz

Sorry, I'm obviously not very quick lately myself. Part of the problem is that I can't find much description of how to set it up. Is there something more I need to do than have my environment set up to work with my cluster?

robbieh avatar Mar 23 '19 22:03 robbieh

Hi! Don't know if it is still actual, but. It is easy:

  1. Use spark dependency as :provided (you already have needed jars in class-path on spark nodes) I do

:provided {:dependencies [[org.apache.spark/spark-core_2.11 "2.2.0.cloudera1" :exclusions [org.xerial.snappy/snappy-java]] [org.apache.spark/spark-sql_2.11 "2.2.0.cloudera1"] ]}

  1. use AOT for your core + sparkling.serialization sparkling.destructuring

:aot [myspark.core sparkling.serialization sparkling.destructuring]

  1. uberjar it

  2. run jar with spark-submit

I do it from airflow as

jar_path = "hdfs://jarpath/myspark.jar" base_conf = {"spark.executor.extraJavaOptions": "-XX:+UseNUMA -XX:+UseG1GC -XX:+UseCompressedOops -XX:+PrintFlagsFinal", "spark.driver.maxResultSize": "3G", "spark.shuffle.service.enabled": "true", "spark.kryoserializer.buffer.max": "2042m", "spark.memory.storageFraction": "0.05", "spark.network.timeout": "999s", "spark.scheduler.mode": "FAIR", "spark.sql.parquet.enableVectorizedReader": "true", "spark.driver.cores": "4", "spark.driver.extraJavaOptions": "-XX:+UseNUMA -XX:+UseG1GC -XX:+UseCompressedOops -Djdk.http.auth.proxying.disabledSchemes='' -Djdk.http.auth.tunneling.disabledSchemes=''", "spark.submit.deployMode": "cluster", "spark.master": "yarn", "spark.dynamicAllocation.enabled": "true", "spark.dynamicAllocation.maxExecutors": "100", "spark.executor.cores": "1", "spark.memory.fraction": "0.65", } other_conf = {"spark.driver.memory": "32G", "spark.executor.memory": "8G", "spark.yarn.executor.memoryOverhead": "5G", "spark.sql.shuffle.partitions": "200" } base_conf.update(other_conf) print base_conf spark_operator = SparkSubmitOperator( application = jar_path, conf = base_conf, java_class = "myspark.core", name = "myspark.core - {}".format(datetime.now()), task_id = "sparkJob", trigger_rule = "one_success", dag = dag )

evilsneer avatar Apr 14 '20 17:04 evilsneer