airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Missing termination argument in DockerOperator

Open antoinetavant opened this issue 1 year ago • 8 comments
trafficstars

Apache Airflow Provider(s)

docker

Versions of Apache Airflow Providers

3.10.0

Apache Airflow version

2.8.3-python3.11

Operating System

ubuntu

Deployment

Docker-Compose

Deployment details

I'm using docker-compose on an EC2 instance

What happened

While launching the DAG, when a task fails (I mean : the python code of the task), the file /tmp/script.py expect a forth argument that is missing

What you think should happen instead

The error seems to come from https://github.com/apache/airflow/pull/31747 that adds the termination file as termination_log_path = tmp_dir / "termination.log"

How to reproduce

from airflow.decorators import dag, task
@dag()
def failing_test():
    @task.docker(image="python:3.11-slim-bookworm", python_command="python3", docker_url="tcp://docker-socket-proxy:2375",)
    def fails():
        raise RuntimeError("This is a test")
    fails()
failing_test()

results in

...
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - + python3 -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x);'
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - + python3 -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x);'
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - + python3 /tmp/script.py /tmp/script.in /tmp/script.out
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - Traceback (most recent call last):
  File "/tmp/script.py", line 31, in <module>
    res = fails(*arg_dict["args"], **arg_dict["kwargs"])
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/script.py", line 17, in fails
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - raise RuntimeError("This is a test")
RuntimeError: This is a test
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/tmp/script.py", line 33, in <module>
[2024-04-29, 23:09:01 UTC] {docker.py:436} INFO - with open(sys.argv[4], "w") as file:
              ~~~~~~~~^^^
IndexError: list index out of range
[2024-04-29, 23:09:02 UTC] {taskinstance.py:2731} ERROR - Task failed with exception
...

Anything else

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

antoinetavant avatar Apr 29 '24 23:04 antoinetavant

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Apr 29 '24 23:04 boring-cyborg[bot]

As you checked, you're willing to submit a PR; I'll assign this one to you. Thanks!

Lee-W avatar Apr 30 '24 08:04 Lee-W

Ok. It's my first contribution, I'll try to not mess it !

antoinetavant avatar Apr 30 '24 08:04 antoinetavant

Ok. It's my first contribution, I'll try to not mess it !

That's awesome! Many thanks! Please let us know if you encounter issues during development!

Lee-W avatar Apr 30 '24 08:04 Lee-W

Seems like this one a duplicate of https://github.com/apache/airflow/issues/33692

Taragolis avatar Apr 30 '24 09:04 Taragolis

I guess the better solution would be disable extra debug ranges by set implicitly PYTHONNODEBUGRANGES to any value with ability to set it off

Without this feature traceback seems like good

@task.docker(
    image="python:3.11-slim-bookworm",
    auto_remove="force",
    environment={"PYTHONNODEBUGRANGES": "true"}
)
def f():
    raise RuntimeError("This is a test")
[2024-04-30T13:49:25.647+0400] {taskinstance.py:2143} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.f test [None]>
[2024-04-30T13:49:25.648+0400] {taskinstance.py:2143} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.f test [None]>
[2024-04-30T13:49:25.649+0400] {taskinstance.py:2378} INFO - Starting attempt 1 of 1
[2024-04-30T13:49:25.649+0400] {taskinstance.py:2460} WARNING - cannot record queued_duration for task f because previous state change time has not been saved
[2024-04-30T13:49:25.651+0400] {taskinstance.py:2402} INFO - Executing <Task(_DockerDecoratedOperator): f> on 2021-09-01 00:00:00+00:00
[2024-04-30T13:49:26.317+0400] {taskinstance.py:2721} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='f' AIRFLOW_CTX_EXECUTION_DATE='2021-09-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='test'
[2024-04-30T13:49:26.319+0400] {taskinstance.py:438} INFO - ::endgroup::
[2024-04-30T13:49:26.378+0400] {docker.py:366} INFO - Starting docker container from image python:3.11-slim-bookworm
[2024-04-30T13:49:26.641+0400] {docker.py:436} INFO - + python3 -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x);'
[2024-04-30T13:49:26.713+0400] {docker.py:436} INFO - + python3 -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x);'
[2024-04-30T13:49:26.723+0400] {docker.py:436} INFO - + python3 /tmp/script.py /tmp/script.in /tmp/script.out
[2024-04-30T13:49:26.743+0400] {docker.py:436} INFO - Traceback (most recent call last):
  File "/tmp/script.py", line 31, in <module>
[2024-04-30T13:49:26.743+0400] {docker.py:436} INFO - res = f(*arg_dict["args"], **arg_dict["kwargs"])
  File "/tmp/script.py", line 17, in f
    raise RuntimeError("This is a test")
RuntimeError: This is a test

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/script.py", line 33, in <module>
[2024-04-30T13:49:26.744+0400] {docker.py:436} INFO - with open(sys.argv[4], "w") as file:
IndexError: list index out of range
[2024-04-30T13:49:26.839+0400] {taskinstance.py:453} INFO - ::group::Post task execution logs
[2024-04-30T13:49:26.841+0400] {taskinstance.py:2976} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/Users/taragolis/Projects/common/airflow/airflow/models/taskinstance.py", line 477, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/Users/taragolis/Projects/common/airflow/airflow/models/taskinstance.py", line 440, in _execute_callable
    return ExecutionCallableRunner(
  File "/Users/taragolis/Projects/common/airflow/airflow/utils/operator_helpers.py", line 250, in run
    return self.func(*args, **kwargs)
  File "/Users/taragolis/Projects/common/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
  File "/Users/taragolis/Projects/common/airflow/airflow/providers/docker/decorators/docker.py", line 127, in execute
    return super().execute(context)
  File "/Users/taragolis/Projects/common/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
  File "/Users/taragolis/Projects/common/airflow/airflow/decorators/base.py", line 265, in execute
    return_value = super().execute(context)
  File "/Users/taragolis/Projects/common/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
  File "/Users/taragolis/Projects/common/airflow/airflow/providers/docker/operators/docker.py", line 509, in execute
    return self._run_image()
  File "/Users/taragolis/Projects/common/airflow/airflow/providers/docker/operators/docker.py", line 371, in _run_image
    return self._run_image_with_mounts([*self.mounts, tmp_mount], add_tmp_variable=True)
  File "/Users/taragolis/Projects/common/airflow/airflow/providers/docker/operators/docker.py", line 444, in _run_image_with_mounts
    raise DockerContainerFailedException(f"Docker container failed: {result!r}", logs=log_lines)
airflow.providers.docker.exceptions.DockerContainerFailedException: Docker container failed: {'StatusCode': 1}
[2024-04-30T13:49:26.844+0400] {taskinstance.py:1260} INFO - Marking task as FAILED. dag_id=test_dag, task_id=f, run_id=test, execution_date=20210901T000000, start_date=20240430T094925, end_date=20240430T094926

Taragolis avatar Apr 30 '24 09:04 Taragolis

Thanks for looking into it ! Indeed, this fix the layout, but we lose this nice functionality...

I thought about fixing the next error

[2024-04-30T13:49:26.744+0400] {docker.py:436} INFO - with open(sys.argv[4], "w") as file:
IndexError: list index out of range

that somewhat pollute the log with an error message that should not be raised, and which might (I tried and failed so far) to provide a Traceback and error message with the additional information.

Unfortunate, what I thought was simple to fix is actually more complex...

antoinetavant avatar Apr 30 '24 12:04 antoinetavant

Indeed, this fix the layout, but we lose this nice functionality...

If you find a correct way to handle PEP-657 in runtime in loggers it would be nice

Taragolis avatar Apr 30 '24 13:04 Taragolis