KubernetesPodOperator @task.kubernetes XComs fail with Dynamic Task Mapping `expand`
Apache Airflow version
2.8.1
What happened?
I tried to pass the result of an expanded task to a @task.kubernetes task, and it failed.
This only seems to work when one of following workarounds are used:
- Use Airflow as a base image and set the value of
AIRFLOW__DATABASE__SQL_ALCHEMY_CONNto the same value as the Airflow pods (scheduler, worker, etc.) for the pod of the@task.kubernetestask that receives theLazyXComAccessargument - Before passing the
LazyXComAccessargument to the@task.kubernetestask, use thePythonOperatorto eagerly load it
What you think should happen instead?
@task.kubernetes tasks should be able to handle LazyXComAccess objects with the following constraints:
- The task pods remain independent of Airflow so that Airflow can act as the job orchestrator
- No Airflow-dependent tasks (e.g.,
PythonOperator) are necessary for middleman eager-loading ofLazyXComAccessobjects
How to reproduce
- Run Airflow using the image built from the Dockerfile
- Run the DAG
DAG definition
"""
Based on the example DAG demonstrating the usage of dynamic task mapping.
Found here: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#simple-mapping
"""
from __future__ import annotations
from datetime import datetime
from airflow.decorators import task, dag
@dag(dag_id="example_dynamic_task_mapping_with_k8s",
start_date=datetime(2022, 3, 4),
schedule_interval="*/1 * * * *")
def example_dynamic_task_mapping():
# The "add_one" variants
@task.kubernetes(task_id="add_one_k8s",
image="python:3.11-slim-bookworm",
do_xcom_push=True)
def add_one_k8s(x: int) -> int:
return x + 1
@task(task_id="add_one_python")
def add_one_python(x: int) -> int:
return x + 1
# The "sum_it" variants
@task(task_id="sum_it_python")
def sum_it_python(values) -> None:
total = sum(values)
print(f"Total was {total}")
@task.kubernetes(task_id="sum_it_k8s",
image="python:3.11-slim-bookworm",
do_xcom_push=True)
def sum_it_k8s(values) -> None:
total = sum(values)
print(f"Total was {total}")
@task.kubernetes(task_id="sum_it_k8s_airflow",
image="apache/airflow:2.8.1-python3.11",
do_xcom_push=True)
def sum_it_k8s_airflow(values) -> None:
total = sum(values)
print(f"Total was {total}")
@task.kubernetes(task_id="sum_it_k8s_airflow_config",
image="apache/airflow:2.8.1-python3.11",
do_xcom_push=True,
env_vars={"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": "postgresql+psycopg2://postgres:[email protected]:6432/airflow"})
def sum_it_k8s_airflow_config(values) -> None:
total = sum(values)
print(f"Total was {total}")
@task.kubernetes(task_id="sum_it_k8s_lazyevaluated",
image="python:3.11-slim-bookworm",
do_xcom_push=True)
def sum_it_k8s_lazyevaluated(values) -> None:
total = sum(values)
print(f"Total was {total}")
@task(task_id="evaluate_lazy_xcoms_python")
def evaluate_lazy_xcoms(values: list[int]) -> list[int]:
return list[int](values)
# The "sum_it" variants for the python "add_one" task
added_values_python = add_one_python.expand(x=[1, 2, 3])
sum_it_python(added_values_python)
sum_it_k8s(added_values_python)
sum_it_k8s_airflow(added_values_python)
sum_it_k8s_airflow_config(added_values_python)
sum_it_k8s_lazyevaluated(evaluate_lazy_xcoms(added_values_python))
# The "sum_it" variants for the k8s "add_one" task
added_values_k8s = add_one_k8s.expand(x=[4, 5, 6])
sum_it_python(added_values_k8s)
sum_it_k8s(added_values_k8s)
sum_it_k8s_airflow(added_values_k8s)
sum_it_k8s_airflow_config(added_values_k8s)
sum_it_k8s_lazyevaluated(evaluate_lazy_xcoms(added_values_k8s))
example_dynamic_task_mapping()
Results
sum_it_python
airflow-worker-0.airflow-worker.default.svc.cluster.local
*** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_python/attempt=1.log
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_python scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_python scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_PythonDecoratedOperator): sum_it_python> on 2024-02-21 00:07:00+00:00
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3063 to run task
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_python', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '53', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmphlpc531x']
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 53: Subtask sum_it_python
[2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_python scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_python' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
[2024-02-21, 00:08:44 UTC] {logging_mixin.py:188} INFO - Total was 9
[2024-02-21, 00:08:44 UTC] {python.py:201} INFO - Done. Returned value was: None
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_python, execution_date=20240221T000700, start_date=20240221T000844, end_date=20240221T000844
[2024-02-21, 00:08:44 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-02-21, 00:08:44 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check
sum_it_k8s
airflow-worker-0.airflow-worker.default.svc.cluster.local
*** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s/attempt=1.log
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_KubernetesDecoratedOperator): sum_it_k8s> on 2024-02-21 00:07:00+00:00
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3062 to run task
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '52', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmpwibfdck4']
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 52: Subtask sum_it_k8s
[2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_k8s' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
[2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - /home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036: AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`.
pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name)
[2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip with labels: {'dag_id': 'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 'sum_it_k8s', 'try_number': '1'}
[2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of task_instance: 1
[2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1
[2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
[2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
[2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
[2024-02-21, 00:08:48 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
[2024-02-21, 00:08:49 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
[2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()'
[2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()'
[2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p /***/xcom
[2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python /tmp/script.py /tmp/script.in /***/xcom/return.json
[2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] Traceback (most recent call last):
[2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] File "/tmp/script.py", line 16, in <module>
[2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] arg_dict = pickle.load(file)
[2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^
[2024-02-21, 00:08:50 UTC] {pod_manager.py:483} INFO - [base] ModuleNotFoundError: No module named '***'
[2024-02-21, 00:08:50 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-02-21, 00:08:50 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi
[2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-02-21, 00:08:50 UTC] {pod.py:559} INFO - xcom result file is empty.
[2024-02-21, 00:08:50 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip has phase Running
[2024-02-21, 00:08:52 UTC] {pod.py:909} INFO - Deleting pod: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
[2024-02-21, 00:08:52 UTC] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py", line 128, in execute
return super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 570, in execute
return self.execute_sync(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 629, in execute_sync
self.cleanup(
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup
raise AirflowException(
airflow.exceptions.AirflowException: Pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip returned a failure.
remote_pod: {'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
...
'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal())}}
[2024-02-21, 00:08:52 UTC] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_k8s, execution_date=20240221T000700, start_date=20240221T000844, end_date=20240221T000852
[2024-02-21, 00:08:52 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 52 for task sum_it_k8s (Pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip returned a failure.
remote_pod: {'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
'creation_timestamp': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal()),
...
'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal())}}; 3062)
[2024-02-21, 00:08:52 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-02-21, 00:08:52 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check
sum_it_k8s_airflow
airflow-worker-0.airflow-worker.default.svc.cluster.local
*** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_airflow/attempt=1.log
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_KubernetesDecoratedOperator): sum_it_k8s_airflow> on 2024-02-21 00:07:00+00:00
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3044 to run task
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s_***', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '49', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmp7k0l0mmp']
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 49: Subtask sum_it_k8s_***
[2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_k8s_***' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
[2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - /home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036: AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`.
pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name)
[2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t with labels: {'dag_id': 'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_***', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 'sum_it_k8s_***', 'try_number': '1'}
[2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of task_instance: 1
[2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1
[2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t
[2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t
[2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t
[2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()'
[2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()'
[2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p /***/xcom
2024-02-21T00:08:49.598549087Z
2024-02-21T00:08:49.598558379Z
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] + python /tmp/script.py /tmp/script.in /***/xcom/return.json
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] Traceback (most recent call last):
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] self.dialect.do_execute(
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] cursor.execute(statement, parameters)
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] sqlite3.OperationalError: no such table: xcom
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] The above exception was the direct cause of the following exception:
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] Traceback (most recent call last):
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/tmp/script.py", line 24, in <module>
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] res = sum_it_k8s_***(*arg_dict["args"], **arg_dict["kwargs"])
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/tmp/script.py", line 21, in sum_it_k8s_***
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] total = sum(values)
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/***/models/xcom.py", line 721, in __next__
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] return XCom.deserialize_value(next(self._it))
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2901, in __iter__
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] result = self._iter()
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2916, in _iter
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] result = self.session.execute(
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] result = conn._execute_20(statement, params or {}, execution_options)
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] return meth(self, args_10style, kwargs_10style, execution_options)
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] return connection._execute_clauseelement(
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ret = self._execute_context(
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] self._handle_dbapi_exception(
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] util.raise_(
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] raise exception
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] self.dialect.do_execute(
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] cursor.execute(statement, parameters)
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: xcom
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] [SQL: SELECT xcom.value
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] FROM xcom JOIN dag_run ON xcom.dag_run_id = dag_run.id
[2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] WHERE xcom.key = 'return_value' AND xcom.task_id = 'add_one_python' AND xcom.dag_id = 'example_dynamic_task_mapping_with_k8s' AND xcom.run_id = 'scheduled__2024-02-21T00:07:00+00:00' ORDER BY xcom.map_index ASC]
[2024-02-21, 00:08:50 UTC] {pod_manager.py:483} INFO - [base] (Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-02-21, 00:08:50 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-02-21, 00:08:50 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi
[2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-02-21, 00:08:51 UTC] {pod.py:559} INFO - xcom result file is empty.
[2024-02-21, 00:08:51 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t has phase Running
[2024-02-21, 00:08:53 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t has phase Running
[2024-02-21, 00:08:55 UTC] {pod.py:909} INFO - Deleting pod: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t
[2024-02-21, 00:08:55 UTC] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py", line 128, in execute
return super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 570, in execute
return self.execute_sync(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 629, in execute_sync
self.cleanup(
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup
raise AirflowException(
airflow.exceptions.AirflowException: Pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t returned a failure.
remote_pod: {'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
'creation_timestamp': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal()),
...
'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal())}}; 3044)
[2024-02-21, 00:08:55 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-02-21, 00:08:55 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check
sum_it_k8s_airflow_config
airflow-worker-0.airflow-worker.default.svc.cluster.local
*** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_airflow_config/attempt=1.log
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_KubernetesDecoratedOperator): sum_it_k8s_airflow_config> on 2024-02-21 00:07:00+00:00
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3049 to run task
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s_***_config', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '51', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmp04503lkh']
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 51: Subtask sum_it_k8s_***_config
[2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_k8s_***_config' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
[2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - /home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036: AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`.
pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name)
[2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k with labels: {'dag_id': 'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_***_config', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 'sum_it_k8s_***_config', 'try_number': '1'}
[2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of task_instance: 1
[2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1
[2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
[2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
[2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
[2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
[2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()'
[2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()'
[2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p /***/xcom
[2024-02-21, 00:08:51 UTC] {pod_manager.py:466} INFO - [base] + python /tmp/script.py /tmp/script.in /***/xcom/return.json
[2024-02-21, 00:08:52 UTC] {pod_manager.py:483} INFO - [base] Total was 9
[2024-02-21, 00:08:52 UTC] {pod_manager.py:511} WARNING - Pod k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k log read interrupted but container base still running
[2024-02-21, 00:08:53 UTC] {pod_manager.py:483} INFO - [base] Total was 9
[2024-02-21, 00:08:53 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-02-21, 00:08:53 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-02-21, 00:08:53 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi
[2024-02-21, 00:08:53 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-02-21, 00:08:53 UTC] {pod.py:559} INFO - xcom result file is empty.
[2024-02-21, 00:08:53 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k has phase Running
[2024-02-21, 00:08:55 UTC] {pod.py:909} INFO - Deleting pod: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
[2024-02-21, 00:08:55 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_k8s_***_config, execution_date=20240221T000700, start_date=20240221T000844, end_date=20240221T000855
[2024-02-21, 00:08:55 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-02-21, 00:08:55 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check
evaluate_lazy_xcoms_python
airflow-worker-0.airflow-worker.default.svc.cluster.local
*** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=evaluate_lazy_xcoms_python/attempt=1.log
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_PythonDecoratedOperator): evaluate_lazy_xcoms_python> on 2024-02-21 00:07:00+00:00
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3048 to run task
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'evaluate_lazy_xcoms_python', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '50', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmps0lkgd19']
[2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 50: Subtask evaluate_lazy_xcoms_python
[2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local
[2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='evaluate_lazy_xcoms_python' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
[2024-02-21, 00:08:44 UTC] {python.py:201} INFO - Done. Returned value was: [2, 3, 4]
[2024-02-21, 00:08:44 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=evaluate_lazy_xcoms_python, execution_date=20240221T000700, start_date=20240221T000844, end_date=20240221T000844
[2024-02-21, 00:08:44 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-02-21, 00:08:44 UTC] {taskinstance.py:3280} INFO - 1 downstream tasks scheduled from follow-on schedule check
sum_it_k8s_lazyevaluated
airflow-worker-0.airflow-worker.default.svc.cluster.local
*** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_lazyevaluated/attempt=1.log
[2024-02-21, 00:08:45 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:45 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated scheduled__2024-02-21T00:07:00+00:00 [queued]>
[2024-02-21, 00:08:45 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-02-21, 00:08:45 UTC] {taskinstance.py:2191} INFO - Executing <Task(_KubernetesDecoratedOperator): sum_it_k8s_lazyevaluated> on 2024-02-21 00:07:00+00:00
[2024-02-21, 00:08:45 UTC] {standard_task_runner.py:60} INFO - Started process 3099 to run task
[2024-02-21, 00:08:45 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s_lazyevaluated', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '54', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmph4g0tr_h']
[2024-02-21, 00:08:45 UTC] {standard_task_runner.py:88} INFO - Job 54: Subtask sum_it_k8s_lazyevaluated
[2024-02-21, 00:08:45 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local
[2024-02-21, 00:08:45 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_k8s_lazyevaluated' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
[2024-02-21, 00:08:45 UTC] {warnings.py:109} WARNING - /home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036: AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`.
pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name)
[2024-02-21, 00:08:45 UTC] {pod.py:1055} INFO - Building pod k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 with labels: {'dag_id': 'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_lazyevaluated', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-02-21, 00:08:45 UTC] {pod.py:526} INFO - Found matching pod k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 'sum_it_k8s_lazyevaluated', 'try_number': '1'}
[2024-02-21, 00:08:45 UTC] {pod.py:527} INFO - `try_number` of task_instance: 1
[2024-02-21, 00:08:45 UTC] {pod.py:528} INFO - `try_number` of pod: 1
[2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4
[2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4
[2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4
[2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()'
[2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()'
[2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p /***/xcom
[2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python /tmp/script.py /tmp/script.in /***/xcom/return.json
[2024-02-21, 00:08:48 UTC] {pod_manager.py:483} INFO - [base] Total was 9
[2024-02-21, 00:08:48 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-02-21, 00:08:48 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-02-21, 00:08:48 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi
[2024-02-21, 00:08:48 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-02-21, 00:08:49 UTC] {pod.py:559} INFO - xcom result file is empty.
[2024-02-21, 00:08:49 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 has phase Running
[2024-02-21, 00:08:52 UTC] {pod.py:909} INFO - Deleting pod: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4
[2024-02-21, 00:08:52 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_k8s_lazyevaluated, execution_date=20240221T000700, start_date=20240221T000845, end_date=20240221T000852
[2024-02-21, 00:08:52 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-02-21, 00:08:52 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check
Operating System
macOS 13.4 (22F66)
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes 8.0.0
Deployment
airflow-helm helm chart
Deployment details
Host specifications
- MacBook Pro with Apple M2 Pro
- macOS 13.4 (22F66)
- Darwin 22.5.0
- Local Kubernetes 1.27.7
- k3d v5.6.0 using image
rancher/k3s:v1.27.7-k3s1 - Docker Desktop with Rosetta for x86/amd64 emulation on Apple Silicon
- k3d v5.6.0 using image
Airflow specifications
- Airflow 2.8.1 with Python 3.11
- Based on the
apache/airflow:2.8.1-python3.11image
- Based on the
- Deployed using airflow-helm 8.8.0
helm version= version.BuildInfo{Version:"v3.12.3", GitCommit:"3a31588ad33fe3b89af5a2a54ee1d25bfe6eaa5e", GitTreeState:"clean", GoVersion:"go1.20.7"}
apache-airflow-providers-cncf-kubernetes8.0.0
Dockerfile
Dockerfile
FROM apache/airflow:2.8.1-python3.11
COPY requirements.txt requirements.txt
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
requirements.txt
apache-airflow-providers-cncf-kubernetes==8.0.0
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
I did a regression and can confirm the analysis you made. Problem is that the expansion makes a lazy XCom resolution and this requires airflow codebase and XCom database access - thus both airflow python code and correct DB connection string needs to be there.
I can also confirm that your proposed workaround to put a PythonOperator in between resolves the problem as this would do the lazy aggregate.
If not easily to be fixed I'd propose the documentation should mention this at least. Current restrictions of XCom in KPO are solely about how XCom is passed outof the operator but no statement about the inpout in case of task maping aggregation / lazy XCom restriction.
I did a regression and can confirm the analysis you made. Problem is that the expansion makes a lazy XCom resolution and this requires airflow codebase and XCom database access - thus both airflow python code and correct DB connection string needs to be there.
I can also confirm that your proposed workaround to put a PythonOperator in between resolves the problem as this would do the lazy aggregate.
If not easily to be fixed I'd propose the documentation should mention this at least. Current restrictions of XCom in KPO are solely about how XCom is passed outof the operator but no statement about the inpout in case of task maping aggregation / lazy XCom restriction.
That's great to hear, thanks for confirming these things!
I agree that at a minimum the documentation (probably on the Dynamic Task Mapping page here) should reflect this limitation, and potentially should also mention the PythonOperator workaround depending on how difficult it is to find and implement a widely agreeable solution.
A potential solution would be to build in on-worker (i.e., run the code on the Airflow worker before executing the task itself) eager-loading of LazyXComAccess arguments to all operators that are meant to be able to work without a connection to the XCom database (e.g., all operators listed under the "Using the TaskFlow API with complex/conflicting Python dependencies" section of the docs here). I'm not sure how difficult that would be though.