airflow
airflow copied to clipboard
kubernetes connection defined via env variable does not work with deferrable=True
Apache Airflow version
2.10.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
I try to run a Kubernetes Pod Operator (v8.4.0) with deferrable=True (and do_xcom_push=True which was also an issue in earlier versions but not now). I define my kubernetes cluster connection in the code using airflow.models.connection.Connection and supply it via an environment variable.
After all containers (init-container, sidecar and main container) have completed succesfully I get
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1792, in resume_execution
return execute_callable(context)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 773, in trigger_reentry
raise AirflowException(message)
airflow.exceptions.AirflowException: Traceback (most recent call last):
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 162, in run
state = await self._wait_for_pod_start()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 223, in _wait_for_pod_start
pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 747, in get_pod
async with self.get_conn() as connection:
File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 734, in get_conn
kube_client = await self._load_config() or async_client.ApiClient()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 664, in _load_config
in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 724, in _get_field
extras = await self.get_conn_extras()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 712, in get_conn_extras
connection = await sync_to_async(self.get_connection)(self.conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 468, in __call__
ret = await asyncio.shield(exec_coro)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 522, in thread_handler
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 169, in get_connection
return super().get_connection(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/hooks/base.py", line 83, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/***/.local/lib/python3.12/site-packages/***/models/connection.py", line 537, in get_connection_from_secrets
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
***.exceptions.AirflowNotFoundException: The conn_id `k8s-conn-id` isn't defined
Now if I:
- set
deferrable=Falseit works with environment variable connection - make an explicit k8s cluster connection in the UI, then
deferrable=Trueworks
What you think should happen instead?
supplying connection via environment variable as described in docs should work exactly like connections via UI
How to reproduce
make a kubernetes cluster connection
# create connection
import os
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
extra = {
"kube_config": <kube_config to your cluser>,
"namespace": <your namespace>,
"in_cluster": False,
}
conn = Connection(conn_id="k8s-conn-id", conn_type="kubernetes", description="k8s connection", extra=extra)
os.environ[f"AIRFLOW_CONN_{conn.conn_id.upper()}"] = conn.get_uri()
# define minimal DAG with deferrable=True and do_xcom_push=True
with DAG( dag_id="k8s-example") as dag:
k = KubernetesPodOperator(
kubernetes_conn_id="k8s-conn-id",
name="k8s-pod",
cmds=["bash", "-cx"],
arguments=["mkdir -p /airflow/xcom/;cat /tmp/mnt/hello.txt > /airflow/xcom/return.json"],
task_id="k8s-pod",
startup_timeout_seconds=1000,
do_xcom_push=True,
pod_template_file="k8s_tempalte.yaml"),
on_finish_action="keep_pod",
deferrable=True
)
# just testing XCOM
b = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('k8s-pod')[0] }}\"",
task_id="pod_task_xcom_result",
)
k >> b
and the manifest
apiVersion: v1
kind: Pod
spec:
initContainers:
- name: init-container
image: "ubuntu:latest"
command: ["bash", "-cx"]
args: ["echo '[1,2,3,4]' > /tmp/mnt/hello.txt"]
resources:
limits:
cpu: 500m
memory: 1Gi
requests:
cpu: 100m
memory: 1Gi
volumeMounts:
- name: shared-volume
mountPath: "/tmp/mnt"
containers:
- name: base
image: "ubuntu:latest"
imagePullPolicy: IfNotPresent
ports: []
resources:
limits:
cpu: 500m
memory: 1Gi
requests:
cpu: 100m
memory: 1Gi
volumeMounts:
- name: shared-volume
mountPath: "/tmp/mnt"
restartPolicy: Never
volumes:
- name: shared-volume
emptyDir: {}
Operating System
docker
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==8.4.0 apache-airflow-providers-microsoft-azure==10.3.0
Deployment
Docker-Compose
Deployment details
I am running Docker locally with the official docker-compose and deploying pods on Azure Kubernetes Services
Anything else?
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Connection ID uses hypen k8s-conn-id in the screenshot. Should it be k8s_conn_id ?
@tirkarthi good catch :) but unfortunately just a typo when transferring my code to this example.
The issue still stands. With kubernetes cluster connection served via environment variable the KPO with deferrable=True fails even when the pod runs successfully.
Hi @RNHTTR, politely nudging to hear if there is anything I can do to advance the issue? I am very willing to contribute but will need some guidance :)
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
Hi @RNHTTR, the bot message seems to imply that I should somehow respond. I am unsure what I am supposed to do? You added a pending-response tag; what does it imply?
thank you in advance
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
@mayorblock sorry about that! I removed the stale/pending-response labels.
Hi @RNHTTR, politely nudging to hear if there is anything I can do to advance the issue? I am very willing to contribute but will need some guidance :)
- Does this Connection work when
deferrable=False? - Does this error occur on the latest version of Airflow / latest version of the Kubernetes provider (8.4.2)?
If so, I recommend analyzing the traceback for clues. FWIW, I think the precedence for determining Connections is:
- Secrets backend (if applicable)
- Env vars (if applicable -- they are in this case)
- Airflow DB (that is, Connections created via the UI)
Hi @RNHTTR , no problem :)
- yes, it does work with
deferrable=False(or with connection from airflow DB) - yes, the issue occur for the latest airflow and kubernetes provider versions
I will step through and try to see where the chain falls off. I'll be back (sometime)