airflow
airflow copied to clipboard
Missing termination argument in DockerOperator
Apache Airflow Provider(s)
docker
Versions of Apache Airflow Providers
3.10.0
Apache Airflow version
2.8.3-python3.11
Operating System
ubuntu
Deployment
Docker-Compose
Deployment details
I'm using docker-compose on an EC2 instance
What happened
While launching the DAG, when a task fails (I mean : the python code of the task), the file /tmp/script.py expect a forth argument that is missing
What you think should happen instead
The error seems to come from https://github.com/apache/airflow/pull/31747 that adds the termination file as
termination_log_path = tmp_dir / "termination.log"
How to reproduce
from airflow.decorators import dag, task
@dag()
def failing_test():
@task.docker(image="python:3.11-slim-bookworm", python_command="python3", docker_url="tcp://docker-socket-proxy:2375",)
def fails():
raise RuntimeError("This is a test")
fails()
failing_test()
results in
...
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - + python3 -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x);'
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - + python3 -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x);'
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - + python3 /tmp/script.py /tmp/script.in /tmp/script.out
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - Traceback (most recent call last):
File "/tmp/script.py", line 31, in <module>
res = fails(*arg_dict["args"], **arg_dict["kwargs"])
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/tmp/script.py", line 17, in fails
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - raise RuntimeError("This is a test")
RuntimeError: This is a test
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/script.py", line 33, in <module>
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - with open(sys.argv[4], "w") as file:
~~~~~~~~^^^
IndexError: list index out of range
[2024-04-29, 23:09:02 UTC] {taskinstance.py:2731} ERROR - Task failed with exception
...
Anything else
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
As you checked, you're willing to submit a PR; I'll assign this one to you. Thanks!
Ok. It's my first contribution, I'll try to not mess it !
Ok. It's my first contribution, I'll try to not mess it !
That's awesome! Many thanks! Please let us know if you encounter issues during development!
Seems like this one a duplicate of https://github.com/apache/airflow/issues/33692
I guess the better solution would be disable extra debug ranges by set implicitly PYTHONNODEBUGRANGES to any value with ability to set it off
Without this feature traceback seems like good
@task.docker(
image="python:3.11-slim-bookworm",
auto_remove="force",
environment={"PYTHONNODEBUGRANGES": "true"}
)
def f():
raise RuntimeError("This is a test")
[2024-04-30T13:49:25.647+0400] {taskinstance.py:2143} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.f test [None]>
[2024-04-30T13:49:25.648+0400] {taskinstance.py:2143} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.f test [None]>
[2024-04-30T13:49:25.649+0400] {taskinstance.py:2378} INFO - Starting attempt 1 of 1
[2024-04-30T13:49:25.649+0400] {taskinstance.py:2460} WARNING - cannot record queued_duration for task f because previous state change time has not been saved
[2024-04-30T13:49:25.651+0400] {taskinstance.py:2402} INFO - Executing <Task(_DockerDecoratedOperator): f> on 2021-09-01 00:00:00+00:00
[2024-04-30T13:49:26.317+0400] {taskinstance.py:2721} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='f' AIRFLOW_CTX_EXECUTION_DATE='2021-09-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='test'
[2024-04-30T13:49:26.319+0400] {taskinstance.py:438} INFO - ::endgroup::
[2024-04-30T13:49:26.378+0400] {docker.py:366} INFO - Starting docker container from image python:3.11-slim-bookworm
[2024-04-30T13:49:26.641+0400] {docker.py:436} INFO - + python3 -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x);'
[2024-04-30T13:49:26.713+0400] {docker.py:436} INFO - + python3 -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x);'
[2024-04-30T13:49:26.723+0400] {docker.py:436} INFO - + python3 /tmp/script.py /tmp/script.in /tmp/script.out
[2024-04-30T13:49:26.743+0400] {docker.py:436} INFO - Traceback (most recent call last):
File "/tmp/script.py", line 31, in <module>
[2024-04-30T13:49:26.743+0400] {docker.py:436} INFO - res = f(*arg_dict["args"], **arg_dict["kwargs"])
File "/tmp/script.py", line 17, in f
raise RuntimeError("This is a test")
RuntimeError: This is a test
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/script.py", line 33, in <module>
[2024-04-30T13:49:26.744+0400] {docker.py:436} INFO - with open(sys.argv[4], "w") as file:
IndexError: list index out of range
[2024-04-30T13:49:26.839+0400] {taskinstance.py:453} INFO - ::group::Post task execution logs
[2024-04-30T13:49:26.841+0400] {taskinstance.py:2976} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/taragolis/Projects/common/airflow/airflow/models/taskinstance.py", line 477, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/Users/taragolis/Projects/common/airflow/airflow/models/taskinstance.py", line 440, in _execute_callable
return ExecutionCallableRunner(
File "/Users/taragolis/Projects/common/airflow/airflow/utils/operator_helpers.py", line 250, in run
return self.func(*args, **kwargs)
File "/Users/taragolis/Projects/common/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/Users/taragolis/Projects/common/airflow/airflow/providers/docker/decorators/docker.py", line 127, in execute
return super().execute(context)
File "/Users/taragolis/Projects/common/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/Users/taragolis/Projects/common/airflow/airflow/decorators/base.py", line 265, in execute
return_value = super().execute(context)
File "/Users/taragolis/Projects/common/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
File "/Users/taragolis/Projects/common/airflow/airflow/providers/docker/operators/docker.py", line 509, in execute
return self._run_image()
File "/Users/taragolis/Projects/common/airflow/airflow/providers/docker/operators/docker.py", line 371, in _run_image
return self._run_image_with_mounts([*self.mounts, tmp_mount], add_tmp_variable=True)
File "/Users/taragolis/Projects/common/airflow/airflow/providers/docker/operators/docker.py", line 444, in _run_image_with_mounts
raise DockerContainerFailedException(f"Docker container failed: {result!r}", logs=log_lines)
airflow.providers.docker.exceptions.DockerContainerFailedException: Docker container failed: {'StatusCode': 1}
[2024-04-30T13:49:26.844+0400] {taskinstance.py:1260} INFO - Marking task as FAILED. dag_id=test_dag, task_id=f, run_id=test, execution_date=20210901T000000, start_date=20240430T094925, end_date=20240430T094926
Thanks for looking into it ! Indeed, this fix the layout, but we lose this nice functionality...
I thought about fixing the next error
[2024-04-30T13:49:26.744+0400] {docker.py:436} INFO - with open(sys.argv[4], "w") as file:
IndexError: list index out of range
that somewhat pollute the log with an error message that should not be raised, and which might (I tried and failed so far) to provide a Traceback and error message with the additional information.
Unfortunate, what I thought was simple to fix is actually more complex...
Indeed, this fix the layout, but we lose this nice functionality...
If you find a correct way to handle PEP-657 in runtime in loggers it would be nice