airflow
                                
                                 airflow copied to clipboard
                                
                                    airflow copied to clipboard
                            
                            
                            
                        `dag.test()` never fails in Airflow 2.7.1
Apache Airflow version
2.7.1
What happened
I defined a task as a decorated function with @task.kubernetes. The function always fails. I want to test the DAG with dag.test(). Despite the pod failing with ERROR status and pod's logs report the exception, dag.test() does not raise an exception.
What you think should happen instead
No response
How to reproduce
from airflow.decorators import task
...
with DAG(...) as dag:
    @task.kubernetes(
        image="python:3.10-slim-buster",
        log_events_on_failure=True,
    )
    def foobar():
        raise Exception("Error on foobar")
    foobar()
def test_dag():
    dag.test()
$ pytest -x dag.py
=============================================================================================================================== test session starts ================================================================================================================================
platform linux -- Python 3.11.3, pytest-7.4.2, pluggy-1.0.0
rootdir: ...
plugins: anyio-3.7.0, dotenv-0.5.2, docker-2.0.1
dag.py .             [100%]
================================================================================================================== 1 passed, 2 deselected, 14 warnings in 10.67s ===================================================================================================================
$ kubectl logs k8s-airflow-pod-96316caac8234dc69aa02934fa154664-0ijh076i
+ python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()'
+ python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()'
+ mkdir -p /airflow/xcom
+ python /tmp/script.py /tmp/script.in /airflow/xcom/return.json
Traceback (most recent call last):
  File "/tmp/script.py", line 22, in <module>
    res = foobar(*arg_dict["args"], **arg_dict["kwargs"])
  File "/tmp/script.py", line 20, in foobar
    raise Exception("Error on foobar")
Exception: Error on foobar
$ kubectl describe pods k8s-airflow-pod-96316caac8234dc69aa02934fa154664-0ijh076i
Name:             k8s-airflow-pod-96316caac8234dc69aa02934fa154664-0ijh076i
Namespace:        default
Priority:         0
Service Account:  default
Node:             redacted
Start Time:       Mon, 11 Sep 2023 13:38:34 -0700
Labels:           airflow_kpo_in_cluster=False
                  airflow_version=2.7.1
                  already_checked=True
                  dag_id=mydag
                  kubernetes_pod_operator=True
                  run_id=manual__2023-09-12T203833.3324380000-e8a0da1a8
                  task_id=foobar
                  try_number=1
Annotations:      <none>
Status:           Failed
Operating System
Amazon Linux 2
Versions of Apache Airflow Providers
apache-airflow-providers-amazon          8.2.0
apache-airflow-providers-cncf-kubernetes 7.5.0
apache-airflow-providers-common-sql      1.5.1
apache-airflow-providers-ftp             3.4.1
apache-airflow-providers-http            4.4.1
apache-airflow-providers-imap            3.2.1
apache-airflow-providers-postgres        5.5.1
apache-airflow-providers-sqlite          3.4.1
Deployment
Docker-Compose
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Running this on Airflow 2.6.2...
apache-airflow                           2.6.2
apache-airflow-providers-cncf-kubernetes 7.1.0
... gives correctly raises an airflow.exceptions.AirflowException:. However, the trace fails show the exception message from the pod's container logs.
Here is the culprit commit https://github.com/apache/airflow/commit/90baac669c446eb4dfb9166d996de59289044983 . Tagging @uranusjr.
dag.test() behavior was changed in Airflow 2.7.1 (edit: I found it all the way back to 2.6.2) such that exceptions are never raised and only logged. Currently this is implemented with logging.exception().
https://github.com/apache/airflow/blob/77dd4f04e23323799dac5660b6998b218a8fdc03/airflow/models/dag.py#L2754-L2766
This change makes dag.test() pretty useless.
The exception block probably needs a raise to bubble up the exception. Do you want to submit a pull request for this?
Ran into the same issue today with tests failing while upgrading a project from 2.5.1 to 2.6.3 and opened a pull request adding the suggested fix: https://github.com/apache/airflow/pull/34311
@uranusjr:
The exception block probably needs a
raiseto bubble up the exception. Do you want to submit a pull request for this?
Looks like @styannik 's proposed solution will bring back the behavior that https://github.com/apache/airflow/pull/30965 attempts to fix, as @ketozhang pointed out. Is there a better proposed solution? (cc @hussein-awala @dimberman @potiuk)
That PR is closed and it is unclear how we can get this issue fixed.