Executor Config Lost on Task Retry with Kubernetes Executor in Airflow 2.7.3
Apache Airflow version
2.7.3
What happened
I have encountered a bug where the executor_config for a task is lost when the task is retried. This issue occurs when using the Kubernetes Executor with Airflow version 2.7.3 deployed via the official Airflow Helm chart version 8.8.0.
The executor_config is lost during task retries and appears as an empty dictionary in the Airflow Rest API. This config only reappears and is applied correctly if the task instance details are accessed in the UI.
What you think should happen instead
The executor_config should persist and be applied consistently across all retries of a task without requiring intervention via the UI.
How to reproduce
- Configure a DAG with a task that includes specific settings in
executor_config. - Trigger the DAG and let the task fail to initiate a retry.
- Observe that on retry, the
executor_configis an empty dictionary when checked via the Airflow Rest API. - However, if I navigate to the task instance details in the Airflow UI and click on the task, the
executor_configreappears and is correctly applied to the retried task.
Operating System
apache/airflow:2.7.3-python3.10 docker image
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
Apache Airflow Version: 2.7.3 Helm Chart Version: 8.8.0 (official Airflow Helm chart) Executor: Kubernetes Executor GKE
Anything else
It appears that the executor_config is being lost from the metadata database after the Kubernetes Executor terminates the pod. This results in the absence of the executor_config for tasks that are retried. Interestingly, I discovered a workaround to retrieve the executor_config: by accessing the task instance details in the Airflow UI. This behavior, however, is inconsistent and somewhat perplexing.
This issue suggests that while the executor_config is initially stored correctly, it somehow gets dissociated or removed from the task's metadata upon pod termination by the Kubernetes Executor. The ability to recover this configuration via the UI indicates that the data may still exist but is not being correctly relayed or preserved for retries in the automated workflow.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Can I be assigned this #36136 to work on please?
@MissTipo feel free to raise PR to solve this issue
Hi @buu-nguyen I've tried to reproduce your issue but I didn't manage to get the same results you are experiencing. I've used the following DAG:
from datetime import datetime, timedelta
from airflow import DAG
from kubernetes import client as k8s
from airflow.operators.python import PythonOperator
# Define default_args dictionary to specify the default parameters of the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def print_greetings(ti, **kwargs):
import os
greetings = os.getenv('GREETING', "No greeting")
print(f"kwargs: {kwargs}")
print(f"ti: {ti}")
print(f"try number: {ti.try_number}")
if ti.try_number > 1:
print(f"Greeting: {greetings}")
else:
raise Exception("Error!")
# Define the DAG object with the specified parameters
dag = DAG(
dag_id='example_k8s_executor_dag',
default_args=default_args,
schedule_interval=None, # This DAG is triggered manually
)
# Define the BashOperator task that prints the greeting to stdout
print_greeting_task = PythonOperator(
task_id='print_greeting',
python_callable=print_greetings,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
env=[
k8s.V1EnvVar(
name="GREETING",
value="Hello World!"
),
],
),
]
)
)
},
dag=dag,
)
And after the first try (which failed), I managed to get the expected executor_config (with the custom environment variable I set) via the Airflow REST API (I've used the swagger UI).
I've tested the /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances & /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}
Can you please provide more insights about your issue? (your DAG, which endpoint you used, etc.)
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
I'm still working on this please
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.