astronomer-providers icon indicating copy to clipboard operation
astronomer-providers copied to clipboard

Implement Async for native Spark Operators

Open phanikumv opened this issue 2 years ago • 10 comments

Implement async versions for the following operators(Aligned on descending order of priority):


Acceptance Criteria:

  • [ ] Unit Tests coverage in the PR (90% Code Coverage -- We will need to add CodeCov separately to measure code cov) with all of them passing
  • [ ] Example DAG using the async Operator that can be used to run Integration tests that are parametrized via Environment variables. Example - https://github.com/apache/airflow/blob/8a03a505e1df0f9de276038c5509135ac569a667/airflow/providers/google/cloud/example_dags/example_bigquery_to_gcs.py#L33-L35
  • [ ] Add proper docstrings for each of the methods and functions including Example DAG on how it should be used (populate
  • [ ] Exception Handling in case of errors
  • [ ] Improve the OSS Docs to make sure it covers the following:
    • [ ] Has an example DAG for the sync version
    • [ ] How to add a connection via Environment Variable & explain each of the fields. Example - https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/connections/postgres.html
    • [ ] How to use Guide for the Operator - example: https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html

phanikumv avatar Mar 14 '22 05:03 phanikumv

If we have too many issues with it, we can park it for now, or use LivyOperator as an alternative to submit Spark jobs

kaxil avatar Mar 21 '22 22:03 kaxil

We've got the local connection working for the SparkSubmitOperator yesterday post further analysis. I think this story is unblocked for now @sunank200 , @bharanidharan14 - please comment.

phanikumv avatar Mar 22 '22 03:03 phanikumv

Yes, spark-submit works for the deploy-mode local. So SparkSubmitOperator example dag works with local spark setup. For SparkSqlOperator i am setting up the hive along spark to create a table to test the example DAG.

sunank200 avatar Mar 22 '22 07:03 sunank200

on 7-03-2022 and 21-03-2022 Successfully Installed spark in the airflow worker to run the spark-submit job and Created the spark cluster container tried running the sample spark job via airflow worker, I was facing some issues with container resource allocation issue. 22-03-2022 Tried creating separate docker image, created spark master, worker cluster container and tried submitting the job via airflow worker it was able to pick up the job but there is python version mismatch

bharanidharan14 avatar Mar 23 '22 07:03 bharanidharan14

Working on implementing Spark submit operator async

bharanidharan14 avatar Mar 23 '22 07:03 bharanidharan14

SparkSqlOperator requires the spark-sql to work when it's installed along in the docker. Right now, for deployment mode as local, the SparkSqlOperator fails as spark-sql fails with the following error:

Caused by: ERROR XBM0H: Directory /opt/spark-3.2.1-bin-hadoop3.2/metastore_db cannot be created. at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.impl.services.monitor.StorageFactoryService$10.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.services.monitor.StorageFactoryService.createServiceRoot(Unknown Source) at org.apache.derby.impl.services.monitor.BaseMonitor.bootService(Unknown Source) at org.apache.derby.impl.services.monitor.BaseMonitor.createPersistentService(Unknown Source) at org.apache.derby.impl.services.monitor.FileMonitor.createPersistentService(Unknown Source) at org.apache.derby.iapi.services.monitor.Monitor.createPersistentService(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection$5.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.jdbc.EmbedConnection.createPersistentService(Unknown Source) ... 104 more 22/03/24 05:12:55 ERROR Utils: Uncaught exception in thread shutdown-hook-0 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at org.sparkproject.jetty.server.AbstractConnector.doStop(AbstractConnector.java:371) at org.sparkproject.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88) at org.sparkproject.jetty.server.ServerConnector.doStop(ServerConnector.java:246) at org.sparkproject.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94) at org.sparkproject.jetty.server.Server.doStop(Server.java:459) at org.sparkproject.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94) at org.apache.spark.ui.ServerInfo.stop(JettyUtils.scala:525) at org.apache.spark.ui.WebUI.$anonfun$stop$2(WebUI.scala:174) at org.apache.spark.ui.WebUI.$anonfun$stop$2$adapted(WebUI.scala:174) at scala.Option.foreach(Option.scala:407) at org.apache.spark.ui.WebUI.stop(WebUI.scala:174) at org.apache.spark.ui.SparkUI.stop(SparkUI.scala:101) at org.apache.spark.SparkContext.$anonfun$stop$6(SparkContext.scala:2071) at org.apache.spark.SparkContext.$anonfun$stop$6$adapted(SparkContext.scala:2071) at scala.Option.foreach(Option.scala:407) at org.apache.spark.SparkContext.$anonfun$stop$5(SparkContext.scala:2071) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1442) at org.apache.spark.SparkContext.stop(SparkContext.scala:2071) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:77) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.$anonfun$main$2(SparkSQLCLIDriver.scala:143) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Trying to debug the issue right now and blocked on this. Submit job runs fine for me though.

sunank200 avatar Mar 24 '22 05:03 sunank200

@sunank200 lets stop further effort on SparkSQLOperator and you can start on LivyOperator

phanikumv avatar Mar 24 '22 05:03 phanikumv

SparkSubmitOperator

  • Completed Spark submit async Operator, Trigger, Hooks
  • Completed Test case, Mypy, doc string

bharanidharan14 avatar Mar 25 '22 06:03 bharanidharan14

Added the changes to submit the spark submit job in execute method and getting the status from trigger

bharanidharan14 avatar Mar 30 '22 06:03 bharanidharan14

Below operators are on hold due to connectivity issue between airflow worker container and EMR Spark

  • SparkSubmitOperator
  • SparkSqlOperator
  • SparkJDBCOperator
  • SparkKubernetesOperator

phanikumv avatar Apr 11 '22 05:04 phanikumv