astronomer-providers
astronomer-providers copied to clipboard
Implement Async for native Spark Operators
Implement async versions for the following operators(Aligned on descending order of priority):
- [ ]
SparkSubmitOperator
- @bharanidharan14 - [ ]
SparkSqlOperator
- @sunank200 - [ ]
SparkJDBCOperator
- [x]
LivyOperator
- @sunank200
Acceptance Criteria:
- [ ] Unit Tests coverage in the PR (90% Code Coverage -- We will need to add CodeCov separately to measure code cov) with all of them passing
- [ ] Example DAG using the async Operator that can be used to run Integration tests that are parametrized via Environment variables. Example - https://github.com/apache/airflow/blob/8a03a505e1df0f9de276038c5509135ac569a667/airflow/providers/google/cloud/example_dags/example_bigquery_to_gcs.py#L33-L35
- [ ] Add proper docstrings for each of the methods and functions including Example DAG on how it should be used (populate
- [ ] Exception Handling in case of errors
- [ ] Improve the OSS Docs to make sure it covers the following:
- [ ] Has an example DAG for the sync version
- [ ] How to add a connection via Environment Variable & explain each of the fields. Example - https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/connections/postgres.html
- [ ] How to use Guide for the Operator - example: https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html
If we have too many issues with it, we can park it for now, or use LivyOperator as an alternative to submit Spark jobs
We've got the local connection working for the SparkSubmitOperator yesterday post further analysis. I think this story is unblocked for now @sunank200 , @bharanidharan14 - please comment.
Yes, spark-submit
works for the deploy-mode local
. So SparkSubmitOperator
example dag works with local spark setup. For SparkSqlOperator
i am setting up the hive along spark to create a table to test the example DAG.
on 7-03-2022 and 21-03-2022 Successfully Installed spark in the airflow worker to run the spark-submit job and Created the spark cluster container tried running the sample spark job via airflow worker, I was facing some issues with container resource allocation issue. 22-03-2022 Tried creating separate docker image, created spark master, worker cluster container and tried submitting the job via airflow worker it was able to pick up the job but there is python version mismatch
Working on implementing Spark submit operator async
SparkSqlOperator
requires the spark-sql
to work when it's installed along in the docker. Right now, for deployment mode as local
, the SparkSqlOperator
fails as spark-sql
fails with the following error:
Caused by: ERROR XBM0H: Directory /opt/spark-3.2.1-bin-hadoop3.2/metastore_db cannot be created. at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.impl.services.monitor.StorageFactoryService$10.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.services.monitor.StorageFactoryService.createServiceRoot(Unknown Source) at org.apache.derby.impl.services.monitor.BaseMonitor.bootService(Unknown Source) at org.apache.derby.impl.services.monitor.BaseMonitor.createPersistentService(Unknown Source) at org.apache.derby.impl.services.monitor.FileMonitor.createPersistentService(Unknown Source) at org.apache.derby.iapi.services.monitor.Monitor.createPersistentService(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection$5.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.jdbc.EmbedConnection.createPersistentService(Unknown Source) ... 104 more 22/03/24 05:12:55 ERROR Utils: Uncaught exception in thread shutdown-hook-0 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at org.sparkproject.jetty.server.AbstractConnector.doStop(AbstractConnector.java:371) at org.sparkproject.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88) at org.sparkproject.jetty.server.ServerConnector.doStop(ServerConnector.java:246) at org.sparkproject.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94) at org.sparkproject.jetty.server.Server.doStop(Server.java:459) at org.sparkproject.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94) at org.apache.spark.ui.ServerInfo.stop(JettyUtils.scala:525) at org.apache.spark.ui.WebUI.$anonfun$stop$2(WebUI.scala:174) at org.apache.spark.ui.WebUI.$anonfun$stop$2$adapted(WebUI.scala:174) at scala.Option.foreach(Option.scala:407) at org.apache.spark.ui.WebUI.stop(WebUI.scala:174) at org.apache.spark.ui.SparkUI.stop(SparkUI.scala:101) at org.apache.spark.SparkContext.$anonfun$stop$6(SparkContext.scala:2071) at org.apache.spark.SparkContext.$anonfun$stop$6$adapted(SparkContext.scala:2071) at scala.Option.foreach(Option.scala:407) at org.apache.spark.SparkContext.$anonfun$stop$5(SparkContext.scala:2071) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1442) at org.apache.spark.SparkContext.stop(SparkContext.scala:2071) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:77) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.$anonfun$main$2(SparkSQLCLIDriver.scala:143) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Trying to debug the issue right now and blocked on this. Submit job runs fine for me though.
@sunank200 lets stop further effort on SparkSQLOperator and you can start on LivyOperator
SparkSubmitOperator
- Completed Spark submit async Operator, Trigger, Hooks
- Completed Test case, Mypy, doc string
Added the changes to submit the spark submit job in execute method and getting the status from trigger
Below operators are on hold due to connectivity issue between airflow worker container and EMR Spark
- SparkSubmitOperator
- SparkSqlOperator
- SparkJDBCOperator
- SparkKubernetesOperator