airflow icon indicating copy to clipboard operation
airflow copied to clipboard

kubernetes connection defined via env variable does not work with deferrable=True

Open mayorblock opened this issue 1 year ago • 9 comments

Apache Airflow version

2.10.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I try to run a Kubernetes Pod Operator (v8.4.0) with deferrable=True (and do_xcom_push=True which was also an issue in earlier versions but not now). I define my kubernetes cluster connection in the code using airflow.models.connection.Connection and supply it via an environment variable.

After all containers (init-container, sidecar and main container) have completed succesfully I get

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1792, in resume_execution
    return execute_callable(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 773, in trigger_reentry
    raise AirflowException(message)
airflow.exceptions.AirflowException: Traceback (most recent call last):
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 162, in run
    state = await self._wait_for_pod_start()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 223, in _wait_for_pod_start
    pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 747, in get_pod
    async with self.get_conn() as connection:
  File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 734, in get_conn
    kube_client = await self._load_config() or async_client.ApiClient()
                  ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 664, in _load_config
    in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
                                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 724, in _get_field
    extras = await self.get_conn_extras()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 712, in get_conn_extras
    connection = await sync_to_async(self.get_connection)(self.conn_id)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 468, in __call__
    ret = await asyncio.shield(exec_coro)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 522, in thread_handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 169, in get_connection
    return super().get_connection(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/hooks/base.py", line 83, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/***/.local/lib/python3.12/site-packages/***/models/connection.py", line 537, in get_connection_from_secrets
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
***.exceptions.AirflowNotFoundException: The conn_id `k8s-conn-id` isn't defined

Now if I:

  • set deferrable=False it works with environment variable connection
  • make an explicit k8s cluster connection in the UI, then deferrable=True works image

What you think should happen instead?

supplying connection via environment variable as described in docs should work exactly like connections via UI

How to reproduce

make a kubernetes cluster connection

# create connection
import os
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)

extra = {
        "kube_config": <kube_config to your cluser>,
        "namespace": <your namespace>,  
        "in_cluster": False,
    }
conn = Connection(conn_id="k8s-conn-id", conn_type="kubernetes", description="k8s connection", extra=extra)
os.environ[f"AIRFLOW_CONN_{conn.conn_id.upper()}"] = conn.get_uri()

# define minimal DAG with  deferrable=True and do_xcom_push=True
with DAG( dag_id="k8s-example") as dag:
    k = KubernetesPodOperator(
        kubernetes_conn_id="k8s-conn-id",
        name="k8s-pod",
        cmds=["bash", "-cx"],
        arguments=["mkdir -p /airflow/xcom/;cat /tmp/mnt/hello.txt > /airflow/xcom/return.json"],
        task_id="k8s-pod",
        startup_timeout_seconds=1000,
        do_xcom_push=True,
        pod_template_file="k8s_tempalte.yaml"),
        on_finish_action="keep_pod", 
        deferrable=True
    )
   
    # just testing XCOM
    b = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('k8s-pod')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    k >> b

and the manifest

apiVersion: v1
kind: Pod
spec:
  initContainers:
    - name: init-container
      image: "ubuntu:latest"
      command: ["bash", "-cx"]
      args: ["echo '[1,2,3,4]' > /tmp/mnt/hello.txt"]
      resources:
        limits:
          cpu: 500m
          memory: 1Gi
        requests:
          cpu: 100m
          memory: 1Gi
      volumeMounts:
        - name: shared-volume
          mountPath: "/tmp/mnt"
  containers:
    - name: base
      image: "ubuntu:latest"
      imagePullPolicy: IfNotPresent
      ports: []
      resources:
        limits:
          cpu: 500m
          memory: 1Gi
        requests:
          cpu: 100m
          memory: 1Gi
      volumeMounts:
        - name: shared-volume
          mountPath: "/tmp/mnt"
  restartPolicy: Never
  volumes:
  - name: shared-volume
    emptyDir: {}

Operating System

docker

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==8.4.0 apache-airflow-providers-microsoft-azure==10.3.0

Deployment

Docker-Compose

Deployment details

I am running Docker locally with the official docker-compose and deploying pods on Azure Kubernetes Services

Anything else?

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

mayorblock avatar Aug 23 '24 21:08 mayorblock

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Aug 23 '24 21:08 boring-cyborg[bot]

Connection ID uses hypen k8s-conn-id in the screenshot. Should it be k8s_conn_id ?

tirkarthi avatar Aug 23 '24 22:08 tirkarthi

@tirkarthi good catch :) but unfortunately just a typo when transferring my code to this example.

The issue still stands. With kubernetes cluster connection served via environment variable the KPO with deferrable=True fails even when the pod runs successfully.

mayorblock avatar Aug 26 '24 06:08 mayorblock

Hi @RNHTTR, politely nudging to hear if there is anything I can do to advance the issue? I am very willing to contribute but will need some guidance :)

mayorblock avatar Aug 29 '24 13:08 mayorblock

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Sep 13 '24 00:09 github-actions[bot]

Hi @RNHTTR, the bot message seems to imply that I should somehow respond. I am unsure what I am supposed to do? You added a pending-response tag; what does it imply?

thank you in advance

mayorblock avatar Sep 14 '24 18:09 mayorblock

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Sep 29 '24 00:09 github-actions[bot]

@mayorblock sorry about that! I removed the stale/pending-response labels.

Hi @RNHTTR, politely nudging to hear if there is anything I can do to advance the issue? I am very willing to contribute but will need some guidance :)

  • Does this Connection work when deferrable=False?
  • Does this error occur on the latest version of Airflow / latest version of the Kubernetes provider (8.4.2)?

If so, I recommend analyzing the traceback for clues. FWIW, I think the precedence for determining Connections is:

  1. Secrets backend (if applicable)
  2. Env vars (if applicable -- they are in this case)
  3. Airflow DB (that is, Connections created via the UI)

RNHTTR avatar Oct 03 '24 22:10 RNHTTR

Hi @RNHTTR , no problem :)

  • yes, it does work with deferrable=False (or with connection from airflow DB)
  • yes, the issue occur for the latest airflow and kubernetes provider versions

I will step through and try to see where the chain falls off. I'll be back (sometime)

mayorblock avatar Oct 20 '24 07:10 mayorblock