airflow icon indicating copy to clipboard operation
airflow copied to clipboard

`dag.test()` never fails in Airflow 2.7.1

Open ketozhang opened this issue 2 years ago • 5 comments

Apache Airflow version

2.7.1

What happened

I defined a task as a decorated function with @task.kubernetes. The function always fails. I want to test the DAG with dag.test(). Despite the pod failing with ERROR status and pod's logs report the exception, dag.test() does not raise an exception.

What you think should happen instead

No response

How to reproduce

from airflow.decorators import task

...
with DAG(...) as dag:
    @task.kubernetes(
        image="python:3.10-slim-buster",
        log_events_on_failure=True,
    )
    def foobar():
        raise Exception("Error on foobar")

    foobar()

def test_dag():
    dag.test()
$ pytest -x dag.py
=============================================================================================================================== test session starts ================================================================================================================================
platform linux -- Python 3.11.3, pytest-7.4.2, pluggy-1.0.0
rootdir: ...
plugins: anyio-3.7.0, dotenv-0.5.2, docker-2.0.1

dag.py .             [100%]

================================================================================================================== 1 passed, 2 deselected, 14 warnings in 10.67s ===================================================================================================================
$ kubectl logs k8s-airflow-pod-96316caac8234dc69aa02934fa154664-0ijh076i
+ python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()'
+ python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()'
+ mkdir -p /airflow/xcom
+ python /tmp/script.py /tmp/script.in /airflow/xcom/return.json
Traceback (most recent call last):
  File "/tmp/script.py", line 22, in <module>
    res = foobar(*arg_dict["args"], **arg_dict["kwargs"])
  File "/tmp/script.py", line 20, in foobar
    raise Exception("Error on foobar")
Exception: Error on foobar

$ kubectl describe pods k8s-airflow-pod-96316caac8234dc69aa02934fa154664-0ijh076i
Name:             k8s-airflow-pod-96316caac8234dc69aa02934fa154664-0ijh076i
Namespace:        default
Priority:         0
Service Account:  default
Node:             redacted
Start Time:       Mon, 11 Sep 2023 13:38:34 -0700
Labels:           airflow_kpo_in_cluster=False
                  airflow_version=2.7.1
                  already_checked=True
                  dag_id=mydag
                  kubernetes_pod_operator=True
                  run_id=manual__2023-09-12T203833.3324380000-e8a0da1a8
                  task_id=foobar
                  try_number=1
Annotations:      <none>
Status:           Failed

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

apache-airflow-providers-amazon          8.2.0
apache-airflow-providers-cncf-kubernetes 7.5.0
apache-airflow-providers-common-sql      1.5.1
apache-airflow-providers-ftp             3.4.1
apache-airflow-providers-http            4.4.1
apache-airflow-providers-imap            3.2.1
apache-airflow-providers-postgres        5.5.1
apache-airflow-providers-sqlite          3.4.1

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

ketozhang avatar Sep 11 '23 20:09 ketozhang

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Sep 11 '23 20:09 boring-cyborg[bot]

Running this on Airflow 2.6.2...

apache-airflow                           2.6.2
apache-airflow-providers-cncf-kubernetes 7.1.0

... gives correctly raises an airflow.exceptions.AirflowException:. However, the trace fails show the exception message from the pod's container logs.

ketozhang avatar Sep 11 '23 21:09 ketozhang

Here is the culprit commit https://github.com/apache/airflow/commit/90baac669c446eb4dfb9166d996de59289044983 . Tagging @uranusjr.

dag.test() behavior was changed in Airflow 2.7.1 (edit: I found it all the way back to 2.6.2) such that exceptions are never raised and only logged. Currently this is implemented with logging.exception().

https://github.com/apache/airflow/blob/77dd4f04e23323799dac5660b6998b218a8fdc03/airflow/models/dag.py#L2754-L2766

This change makes dag.test() pretty useless.

ketozhang avatar Sep 11 '23 21:09 ketozhang

The exception block probably needs a raise to bubble up the exception. Do you want to submit a pull request for this?

uranusjr avatar Sep 12 '23 01:09 uranusjr

Ran into the same issue today with tests failing while upgrading a project from 2.5.1 to 2.6.3 and opened a pull request adding the suggested fix: https://github.com/apache/airflow/pull/34311

styannik avatar Sep 12 '23 14:09 styannik

@uranusjr:

The exception block probably needs a raise to bubble up the exception. Do you want to submit a pull request for this?

Looks like @styannik 's proposed solution will bring back the behavior that https://github.com/apache/airflow/pull/30965 attempts to fix, as @ketozhang pointed out. Is there a better proposed solution? (cc @hussein-awala @dimberman @potiuk)

That PR is closed and it is unclear how we can get this issue fixed.

getaaron avatar May 28 '24 20:05 getaaron