airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Executor Config Lost on Task Retry with Kubernetes Executor in Airflow 2.7.3

Open buu-nguyen opened this issue 2 years ago • 6 comments

Apache Airflow version

2.7.3

What happened

I have encountered a bug where the executor_config for a task is lost when the task is retried. This issue occurs when using the Kubernetes Executor with Airflow version 2.7.3 deployed via the official Airflow Helm chart version 8.8.0. The executor_config is lost during task retries and appears as an empty dictionary in the Airflow Rest API. This config only reappears and is applied correctly if the task instance details are accessed in the UI.

What you think should happen instead

The executor_config should persist and be applied consistently across all retries of a task without requiring intervention via the UI.

How to reproduce

  1. Configure a DAG with a task that includes specific settings in executor_config.
  2. Trigger the DAG and let the task fail to initiate a retry.
  3. Observe that on retry, the executor_config is an empty dictionary when checked via the Airflow Rest API.
  4. However, if I navigate to the task instance details in the Airflow UI and click on the task, the executor_config reappears and is correctly applied to the retried task.

Operating System

apache/airflow:2.7.3-python3.10 docker image

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

Apache Airflow Version: 2.7.3 Helm Chart Version: 8.8.0 (official Airflow Helm chart) Executor: Kubernetes Executor GKE

Anything else

It appears that the executor_config is being lost from the metadata database after the Kubernetes Executor terminates the pod. This results in the absence of the executor_config for tasks that are retried. Interestingly, I discovered a workaround to retrieve the executor_config: by accessing the task instance details in the Airflow UI. This behavior, however, is inconsistent and somewhat perplexing.

This issue suggests that while the executor_config is initially stored correctly, it somehow gets dissociated or removed from the task's metadata upon pod termination by the Kubernetes Executor. The ability to recover this configuration via the UI indicates that the data may still exist but is not being correctly relayed or preserved for retries in the automated workflow.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

buu-nguyen avatar Dec 09 '23 05:12 buu-nguyen

Can I be assigned this #36136 to work on please?

MissTipo avatar Dec 12 '23 12:12 MissTipo

@MissTipo feel free to raise PR to solve this issue

eladkal avatar Dec 26 '23 19:12 eladkal

Hi @buu-nguyen I've tried to reproduce your issue but I didn't manage to get the same results you are experiencing. I've used the following DAG:

from datetime import datetime, timedelta
from airflow import DAG
from kubernetes import client as k8s

from airflow.operators.python import PythonOperator

# Define default_args dictionary to specify the default parameters of the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}


def print_greetings(ti, **kwargs):
    import os
    greetings = os.getenv('GREETING', "No greeting")
    print(f"kwargs: {kwargs}")
    print(f"ti: {ti}")
    print(f"try number: {ti.try_number}")
    if ti.try_number > 1:
        print(f"Greeting: {greetings}")
    else:
        raise Exception("Error!")


# Define the DAG object with the specified parameters
dag = DAG(
    dag_id='example_k8s_executor_dag',
    default_args=default_args,
    schedule_interval=None,  # This DAG is triggered manually
)

# Define the BashOperator task that prints the greeting to stdout
print_greeting_task = PythonOperator(
    task_id='print_greeting',
    python_callable=print_greetings,
    executor_config={
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        env=[
                            k8s.V1EnvVar(
                                name="GREETING",
                                value="Hello World!"
                            ),
                        ],
                    ),
                ]
            )
        )
    },
    dag=dag,
)

And after the first try (which failed), I managed to get the expected executor_config (with the custom environment variable I set) via the Airflow REST API (I've used the swagger UI). I've tested the /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances & /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} Can you please provide more insights about your issue? (your DAG, which endpoint you used, etc.)

shohamy7 avatar Jan 20 '24 19:01 shohamy7

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Feb 04 '24 00:02 github-actions[bot]

I'm still working on this please

MissTipo avatar Feb 05 '24 06:02 MissTipo

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Feb 21 '24 00:02 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Feb 29 '24 00:02 github-actions[bot]