airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Re-launch the same spark operator is failing with error "sparkapplications.sparkoperator.k8s.io \"smoke-test\" already exists"

Open xs2tarunkukreja opened this issue 2 years ago • 1 comments

Apache Airflow version

2.7.2

What happened

I am using following code to launch PySpark Job:- t1 = SparkKubernetesOperator( task_id='env-check', namespace="spark-operator", application_file='pipeline.yaml', do_xcom_push=True, dag=dag, )

t2 = SparkKubernetesSensor( task_id='env_app_monitor', namespace="spark-operator", attach_log=True, application_name="{{ task_instance.xcom_pull(task_ids='env-check')['metadata']['name'] }}", dag=dag, )

When I launched it first time, it works fine but on future run, it is failing.

What you think should happen instead

It should run PySpark Job in all future runs.

How to reproduce

Just run a Spark Application twice.

Operating System

K8S

Versions of Apache Airflow Providers

2.7.2

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

xs2tarunkukreja avatar Nov 03 '23 04:11 xs2tarunkukreja

When submitting a task with the Airflow SparkKubernetesOperator, it does not check whether the old Spark application exists. If the old application is present, an error will be thrown. This issue can be avoided by setting a unique name for each run in the application file's yaml configuration, eg:

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: "smoke-test-{{ execution_date.strftime('%Y%m%d') }}-{{ task_instance.try_number }}"
spec:
 xxxxx

kuantian-zhang avatar Nov 15 '23 11:11 kuantian-zhang