airflow icon indicating copy to clipboard operation
airflow copied to clipboard

ImportError: cannot import name 'SUPERVISOR_COMMS' with dag.test()

Open opeida opened this issue 6 months ago • 10 comments

Apache Airflow version

3.0.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The exception occurred when the DAG ran with dag.test() attempted to retrieve a variable from the API server. Some similar issues have been opened (#48554, #51062, #51316). The PRs provided as a solution (#50300, #50419) were included in 3.0.2 but did not fix the problem.

Exception has occurred: ImportError
cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
  File "/workspaces/airflow/test.py", line 7, in <module>
    x = Variable.get("my_variable")
        ^^^^^^^^^^^^^^^^^^^^^^^^
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)

What you think should happen instead?

The variable should have been successfully retrieved without exceptions.

How to reproduce

  1. Set the variable: airflow variables set my_variable my_value
  2. Run DAG:
import logging

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Variable

x = Variable.get("my_variable")

def my_function(my_var: str) -> None:
    logging.getLogger(__name__).info(my_var)

with DAG("test_dag") as dag:

    start = EmptyOperator(task_id="start")

    py_func = PythonOperator(
        task_id="py_func",
        python_callable=my_function,
        op_kwargs={
            "my_var": x
        }
    )

    end = EmptyOperator(task_id="end")

    start >> py_func >> end

if __name__ == "__main__":
    dag.test()

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

Extended image based on apache/airflow:slim-3.0.2-python3.12

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

opeida avatar Jun 16 '25 23:06 opeida

CC: @kaxil we should pick this up for 3.0.3 if its actually an issue, haven't triaged it

amoghrajesh avatar Jun 17 '25 05:06 amoghrajesh

Yeah it is in an issue. The problem is the dag is already parsed before the dag.test code can be called which would assign task_runner.SUPERVISOR_COMMS = InProcessSupervisorComms

kaxil avatar Jun 17 '25 13:06 kaxil

Assuming we can detect the case when this is being imported from doing python mydag.py (which I'm 95% sure we can, so that is the 'easy' part) how should we actually obtain a value/variable?

I.e. ignoring all practicialities or other issues, what should this do, given the increased security model in Airflow 3 of not allowing unfettered DB access?

ashb avatar Jun 17 '25 13:06 ashb

Assuming we can detect the case when this is being imported from doing python mydag.py (which I'm 95% sure we can, so that is the 'easy' part) how should we actually obtain a value/variable?

I.e. ignoring all practicialities or other issues, what should this do, given the increased security model in Airflow 3 of not allowing unfettered DB access?

Since we allow that in the actual dag parsing models (as it goes via DAG processor -> Supervisor comms -> Variable), we should do the same for dag.test.

Now regarding the security model, currently Task Exec API is tied with Task Identity so this becomes a no-go. But that's a breaking change. (Unless ofcourse we want to include DAG file identity too somehow).

kaxil avatar Jun 17 '25 13:06 kaxil

@opeida The ideal way would be to not access Variables or Connections at the top of your file (i.e outside Task Context). In your case, you can rewrite the dag as follows:

import logging

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator


def my_function(my_var: str) -> None:
    logging.getLogger(__name__).info(my_var)

with DAG("test_dag") as dag:

    start = EmptyOperator(task_id="start")

    py_func = PythonOperator(
        task_id="py_func",
        python_callable=my_function,
        op_kwargs={
            "my_var": "{{ var.value.my_variable }}"
        }
    )

    end = EmptyOperator(task_id="end")

    start >> py_func >> end

if __name__ == "__main__":
    dag.test()

kaxil avatar Jun 17 '25 14:06 kaxil

So: detecting when we have top-level dag code, at least in the python mydagfile.py case:

def __getattr__(name):
    if name == "SUPERVISOR_COMMS" and "__main__" in sys.modules:
        import traceback

        frames = [
            frame
            for (frame, lnum) in traceback.walk_stack(None)
            if not frame.f_code.co_filename.startswith("<frozen importlib.")
        ]
        if sys.modules["__main__"].__file__ == frames[-1].f_code.co_filename:
            raise RuntimeError("Top level API access here")

    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

(Put this in airflow/sdk/execution_time/task_runner.py. It won't help for cases in unit tests where someone imports the dag. But that could be handled in other cases)

So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?

ashb avatar Jun 17 '25 14:06 ashb

@opeida The ideal way would be to not access Variables or Connections at the top of your file (i.e outside Task Context)

@kaxil are there workarounds to eliminate the use of top-level variables for dynamic DAG generation? We pull data from numerous accounts of a third-party provider. The accounts variable can change dynamically and stores sensitive information including API keys. In all other cases, we utilize Jinja templates within Task Context following Airflow's best practices.

provider_name = "<provider_name>"
ALL_ACCOUNTS = Variable.get(f"{provider_name}_accounts")

for account in ALL_ACCOUNTS:
    with DAG(
        f"{provider_name}.{account.get('name')}",
        default_args=default_args,
        schedule="10 * * * *",
        catchup=False,
        max_active_runs=1
    ) as dag:
        # DAG operators

opeida avatar Jun 17 '25 15:06 opeida

You can just do following in your case:

provider_name = "<provider_name>"

def my_function(my_var: str) -> None:
    logging.getLogger(__name__).info(my_var)

with DAG("test_dag") as dag:

    start = EmptyOperator(task_id="start")

    py_func = PythonOperator(
        task_id="py_func",
        python_callable=my_function,
        op_kwargs={
            "my_var": "{{ var.value." + provider_name + "_accounts }}"
        }
    )

    end = EmptyOperator(task_id="end")

    start >> py_func >> end

kaxil avatar Jun 17 '25 15:06 kaxil

So: detecting when we have top-level dag code, at least in the python mydagfile.py case:

def getattr(name): if name == "SUPERVISOR_COMMS" and "main" in sys.modules: import traceback

    frames = [
        frame
        for (frame, lnum) in traceback.walk_stack(None)
        if not frame.f_code.co_filename.startswith("<frozen importlib.")
    ]
    if sys.modules["__main__"].__file__ == frames[-1].f_code.co_filename:
        raise RuntimeError("Top level API access here")

raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

(Put this in airflow/sdk/execution_time/task_runner.py. It won't help for cases in unit tests where someone imports the dag. But that could be handled in other cases)

aah 💡 .

So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?

Unsure right now -- need to think it through

kaxil avatar Jun 17 '25 15:06 kaxil

py_func = PythonOperator( task_id="py_func", python_callable=my_function, op_kwargs={ "my_var": "{{ var.value." + provider_name + "_accounts }}" } )

@kaxil the provided DAG reproduces the issue only and does not reflect our actual case. We need the ability to trigger a certain account, gather metrics to StatsD for each account, and utilize the additional flexibility that dynamic DAG generation offers.

Thank you for your reply. I look forward to testing the fix if one is provided. Feel free to ping me if you need any feedback from the user perspective.

UPD: import Variable from airflow.models instead of airflow.sdk fixes the problem but doesn't seem like a best practice solution.

opeida avatar Jun 17 '25 18:06 opeida

UPD: import Variable from airflow.models instead of airflow.sdk fixes the problem but doesn't seem like a best practice solution.

Yeah, that still works as it's falling back to the direct DB access based approach

ashb avatar Jun 18 '25 13:06 ashb

Import from airflow.models works for me to load ENV defined variable. Is airflow.sdk not well oriented for non-runner environment yet? I have seen various troubles already like dag.folder is wrong when running thru CLI, sys.path not being properly enhanced...

simi avatar Jun 26 '25 08:06 simi

We use variables for dynamic DAG scheduling, so we can access and modify schedules via web-UI by ourselves, without having to change deployment configs (OS environment variables) through DevOps guys. Example:

with DAG(
    "DAG_1",
    schedule=Variable.get("DAG_1_SCHEDULE", None)
)

So I feel like I have to ask: will the final fixed version allow for us to do the same?

DartVeDroid avatar Jul 04 '25 17:07 DartVeDroid

We use variables for dynamic DAG scheduling, so we can access and modify schedules via web-UI by ourselves, without having to change deployment configs (OS environment variables) through DevOps guys. Example:

with DAG( "DAG_1", schedule=Variable.get("DAG_1_SCHEDULE", None) ) So I feel like I have to ask: will the final fixed version allow for us to do the same?

Yes, although timeline TBD

kaxil avatar Jul 04 '25 17:07 kaxil